[PATCH v2 01/10] database: export virtual dirty bytes region group

14 views
Skip to first unread message

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:57:57 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
Currently, we export the region group where memtables are placed as dirty bytes.
Upcoming patches will optimistically mark some bytes in this region as free, a
scheme we know as "virtual dirty".

We are still interested in knowing the real state of the dirty region, so we
will keep track of the bytes virtually freed and split the counters in two.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
database.hh | 25 +++++++++++++++++++++++++
database.cc | 10 +++++++++-
2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/database.hh b/database.hh
index 318c1c3..904921f 100644
--- a/database.hh
+++ b/database.hh
@@ -74,6 +74,7 @@
#include <seastar/core/rwlock.hh>
#include <seastar/core/shared_future.hh>
#include "tracing/trace_state.hh"
+#include <boost/intrusive/parent_from_member.hpp>

class frozen_mutation;
class reconcilable_result;
@@ -133,6 +134,7 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
// default values here.
size_t _concurrency;
semaphore _flush_serializer;
+ int64_t _dirty_bytes_released_pre_accounted = 0;

seastar::gate _waiting_flush_gate;
std::vector<shared_memtable> _pending_flushes;
@@ -156,6 +158,11 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
, _region_group(&parent->_region_group, *this)
, _concurrency(concurrency)
, _flush_serializer(concurrency) {}
+
+ static dirty_memory_manager& from_region_group(logalloc::region_group *rg) {
+ return *(boost::intrusive::get_parent_from_member(rg, &dirty_memory_manager::_region_group));
+ }
+
logalloc::region_group& region_group() {
return _region_group;
}
@@ -164,6 +171,24 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
return _region_group;
}

+ void revert_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(delta);
+ _dirty_bytes_released_pre_accounted -= delta;
+ }
+
+ void account_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(-delta);
+ _dirty_bytes_released_pre_accounted += delta;
+ }
+
+ size_t real_dirty_memory() const {
+ return _region_group.memory_used() + _dirty_bytes_released_pre_accounted;
+ }
+
+ size_t virtual_dirty_memory() const {
+ return _region_group.memory_used();
+ }
+
template <typename Func>
future<> serialize_flush(Func&& func) {
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
diff --git a/database.cc b/database.cc
index 3be36c7..3bcafe4 100644
--- a/database.cc
+++ b/database.cc
@@ -1566,7 +1566,15 @@ database::setup_collectd() {
, scollectd::per_cpu_plugin_instance
, "bytes", "dirty")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
- return dirty_memory_region_group().memory_used();
+ return _dirty_memory_manager.real_dirty_memory();
+ })));
+
+ _collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("memory"
+ , scollectd::per_cpu_plugin_instance
+ , "bytes", "virtual_dirty")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.virtual_dirty_memory();
})));

_collectd.push_back(
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:57:57 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
Available at:

To g...@github.com:glommer/scylla.git virtual-dirty-v2

Description:
============

Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that, is that once we reach 50 % occupancy in our dirty
memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

Results
=======

With this patchset running a load big enough to easily saturate the disk,
(commitlog disabled to highlight the effects of the memtable writer), I am able
to run scylla for many minutes, with timeouts occurring only when I run out of
disk space, whereas without this patch a swarm of timeouts would start merely 2
seconds after the load started - and would never get stable.


Changes from V1:
================
- Only used space is accounted for, and we don't account for padding, or free
space in the region. While this slow us down a bit, it is not significant and
it simplifies the code a lot - since we don't have to have extra protections
against compactions
- get rid of virtual functions
- account objects before they are transformed into a mutation fragment -
guaranteeing that they will be in the correct region.

Changes from RFC:
=================
- The accounting was moved to the reader side (mutation_reader), instead of being
done by at the SSTable side.
- Accounting of relevant internal structures (memtable_entry, rows entry) is more
precise.



Glauber Costa (10):
database: export virtual dirty bytes region group
LSA: export information about size of the throttle queue
LSA: export information about object memory footprint
sstables: use special reader for writing a memtable
memtables: split scanning reader in two
LSA: allow a group to query its own region group
move partition_snapshot_reader code to header file
allow looking at the range_tombstone_stream without converting to a
mutation_fragment
add accounting of memory read to partition_snapshot_reader
database: allow virtual dirty memory management

database.hh | 25 ++++++
memtable.hh | 5 +-
partition_version.hh | 200 ++++++++++++++++++++++++++++++++++++++++---
streamed_mutation.hh | 10 ++-
utils/allocation_strategy.hh | 9 ++
utils/logalloc.hh | 24 +++++-
database.cc | 46 +++++++++-
memtable.cc | 195 ++++++++++++++++++++++++++++++++++-------
partition_version.cc | 178 +-------------------------------------
sstables/sstables.cc | 2 +-
streamed_mutation.cc | 47 ++++++----
utils/logalloc.cc | 23 ++++-
12 files changed, 523 insertions(+), 241 deletions(-)

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:57:59 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
Right now the special reader doesn't do much, but the idea is that we will
soon replace it will a reader that specializes in flush, and is in turn able
to provide read-side on-flush functionality like virtual dirty.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 3 +++
memtable.cc | 5 +++++
sstables/sstables.cc | 2 +-
3 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/memtable.hh b/memtable.hh
index c6f9960..98fc7eb 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -147,6 +147,9 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
const query::partition_slice& slice = query::full_slice,
const io_priority_class& pc = default_priority_class());

+
+ mutation_reader make_flush_reader(schema_ptr);
+
mutation_source as_data_source();
key_source as_key_source();

diff --git a/memtable.cc b/memtable.cc
index ac6b04a..93f3d02 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -217,6 +217,11 @@ memtable::make_reader(schema_ptr s,
}
}

+mutation_reader
+memtable::make_flush_reader(schema_ptr s) {
+ return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::no_clustering_key_filtering, default_priority_class());
+}
+
void
memtable::update(const db::replay_position& rp) {
if (_replay_position < rp) {
diff --git a/sstables/sstables.cc b/sstables/sstables.cc
index 5ae3039..4b71d08 100644
--- a/sstables/sstables.cc
+++ b/sstables/sstables.cc
@@ -1784,7 +1784,7 @@ void components_writer::consume_end_of_stream() {

future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc, bool leave_unsealed) {
_collector.set_replay_position(mt.replay_position());
- return write_components(mt.make_reader(mt.schema()),
+ return write_components(mt.make_flush_reader(mt.schema()),
mt.partition_count(), mt.schema(), std::numeric_limits<uint64_t>::max(), backup, pc, leave_unsealed);
}

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:57:59 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
We allocate objects of a certain size, but we use a bit more memory to hold
them. To get a clerer picture about how much memory will an object cost us, we
need help from the allocator. This patch exports an interface that allow users
to query into a specific allocator to get that information.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/allocation_strategy.hh | 9 +++++++++
utils/logalloc.cc | 15 +++++++++++++--
2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/utils/allocation_strategy.hh b/utils/allocation_strategy.hh
index bc5613a..c223db1 100644
--- a/utils/allocation_strategy.hh
+++ b/utils/allocation_strategy.hh
@@ -97,6 +97,11 @@ class allocation_strategy {
// Doesn't invalidate references to objects allocated with this strategy.
virtual void free(void*) = 0;

+ // Returns the total memory size used by the allocator to host this object.
+ // This will be at least the size of the object itself, plus the overhead, if any,
+ // to represent the object.
+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept = 0;
+
// Like alloc() but also constructs the object with a migrator using
// standard move semantics. Allocates respecting object's alignment
// requirement.
@@ -138,6 +143,10 @@ class standard_allocation_strategy : public allocation_strategy {
virtual void free(void* obj) override {
::free(obj);
}
+
+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept {
+ return ::malloc_usable_size(const_cast<void *>(obj));
+ }
};

extern standard_allocation_strategy standard_allocation_strategy_instance;
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index c244324..17a81f4 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
@@ -517,7 +517,7 @@ class segment_pool {
segment* new_segment(region::impl* r);
segment_descriptor& descriptor(const segment*);
// Returns segment containing given object or nullptr.
- segment* containing_segment(void* obj) const;
+ segment* containing_segment(const void* obj) const;
void free_segment(segment*) noexcept;
void free_segment(segment*, segment_descriptor&) noexcept;
size_t segments_in_use() const;
@@ -720,7 +720,7 @@ segment_pool::descriptor(const segment* seg) {
}

segment*
-segment_pool::containing_segment(void* obj) const {
+segment_pool::containing_segment(const void* obj) const {
auto addr = reinterpret_cast<uintptr_t>(obj);
auto offset = addr & (segment::size - 1);
auto index = (addr - _segments_base) >> segment::size_shift;
@@ -1378,6 +1378,17 @@ class region_impl : public allocation_strategy {
}
}

+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept override {
+ segment* seg = shard_segment_pool.containing_segment(obj);
+
+ if (!seg) {
+ return standard_allocator().object_memory_size_in_allocator(obj);
+ } else {
+ auto desc = reinterpret_cast<object_descriptor*>(reinterpret_cast<uintptr_t>(obj) - sizeof(object_descriptor));
+ return sizeof(object_descriptor) + desc->size();
+ }
+ }
+
// Merges another region into this region. The other region is made
// to refer to this region.
// Doesn't invalidate references to allocated objects.
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:58:00 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 2 ++
utils/logalloc.cc | 8 ++++++++
2 files changed, 10 insertions(+)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index e29f608..3eba296 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -582,6 +582,8 @@ class region {

allocation_strategy& allocator();

+ region_group* group();
+
// Merges another region into this region. The other region is left empty.
// Doesn't invalidate references to allocated objects.
void merge(region& other);
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index 17a81f4..ef85815 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
@@ -1293,6 +1293,10 @@ class region_impl : public allocation_strategy {
return total;
}

+ region_group* group() {
+ return _group;
+ }
+
occupancy_stats compactible_occupancy() const {
return _closed_occupancy;
}
@@ -1620,6 +1624,10 @@ occupancy_stats region::occupancy() const {
return _impl->occupancy();
}

+region_group* region::group() {
+ return _impl->group();
+}
+
void region::merge(region& other) {
if (_impl != other._impl) {
_impl->merge(*other._impl);
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:58:00 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
Also add information about for how long has the oldest been sitting in the
queue. This is part of the backpressure work to allow us to throttle incoming
requests if we won't have memory to process them. Shortages can happen in all
sorts of places, and it is useful when designing and testing the solutions to
know where they are, and how bad they are.

Those counters are named for consistency after similar counters from transport/.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 22 +++++++++++++++++++++-
database.cc | 18 ++++++++++++++++++
2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index db8ab35..e29f608 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -147,6 +147,7 @@ class region_group {
struct allocating_function {
virtual ~allocating_function() = default;
virtual void allocate() = 0;
+ virtual std::chrono::duration<double> queued_for() = 0;
};

template <typename Func>
@@ -154,11 +155,18 @@ class region_group {
using futurator = futurize<std::result_of_t<Func()>>;
typename futurator::promise_type pr;
Func func;
+ std::chrono::steady_clock::time_point queued;
public:
+ std::chrono::duration<double> queued_for() override {
+ auto delta = std::chrono::steady_clock::now() - queued;
+ return std::chrono::duration_cast<std::chrono::duration<double>>(delta);
+ }
+
void allocate() override {
futurator::apply(func).forward_to(std::move(pr));
}
- concrete_allocating_function(Func&& func) : func(std::forward<Func>(func)) {}
+ concrete_allocating_function(Func&& func) : func(std::forward<Func>(func))
+ , queued(std::chrono::steady_clock::now()) {}
typename futurator::type get_future() {
return pr.get_future();
}
@@ -318,6 +326,18 @@ class region_group {
_shutdown_requested = true;
return _asynchronous_gate.close();
}
+
+ size_t blocked_requests() {
+ return _blocked_requests.size();
+ }
+
+ float blocked_for() {
+ if (_blocked_requests.empty()) {
+ return 0.0;
+ }
+
+ return _blocked_requests.front()->queued_for().count();
+ }
private:
// Make sure we get a notification and can call release_requests when one of our ancestors that
// used to block us is no longer under memory pressure.
diff --git a/database.cc b/database.cc
index 3bcafe4..3252282 100644
--- a/database.cc
+++ b/database.cc
@@ -1585,6 +1585,24 @@ database::setup_collectd() {
));

_collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("database"
+ , scollectd::per_cpu_plugin_instance
+ , "queue_length", "requests_blocked_memory")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.region_group().blocked_requests();
+ })
+ ));
+
+ _collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("database"
+ , scollectd::per_cpu_plugin_instance
+ , "delay", "requests_blocked_memory")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.region_group().blocked_for();
+ })
+ ));
+
+ _collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("memtables"
, scollectd::per_cpu_plugin_instance
, "bytes", "pending_flushes")
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:58:00 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
The code that is common will live in its own reader, the iterator_reader. All
friendly private access to memtable attributes and methods happen through the
iterator reader.

After this patch, we are now left with the scanning_reader - same as always,
but now implemented on top of the iterator_reader, and a flush_reader, which
will be used by SSTable flushes only.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 2 +-
memtable.cc | 127 ++++++++++++++++++++++++++++++++++++++++++++++--------------
2 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/memtable.hh b/memtable.hh
index 98fc7eb..a0a5cbd 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -161,5 +161,5 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
return _replay_position;
}

- friend class scanning_reader;
+ friend class iterator_reader;
};
diff --git a/memtable.cc b/memtable.cc
index 93f3d02..3a1743b 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -103,7 +103,7 @@ memtable::slice(const query::partition_range& range) const {
}
}

-class scanning_reader final : public mutation_reader::impl {
+class iterator_reader: public mutation_reader::impl {
lw_shared_ptr<memtable> _memtable;
schema_ptr _schema;
const query::partition_range& _range;
@@ -112,11 +112,7 @@ class scanning_reader final : public mutation_reader::impl {
memtable::partitions_type::iterator _end;
uint64_t _last_reclaim_counter;
size_t _last_partition_count = 0;
- stdx::optional<query::partition_range> _delegate_range;
- mutation_reader _delegate;
- const io_priority_class& _pc;
- const query::partition_slice& _slice;
-private:
+
memtable::partitions_type::iterator lookup_end() {
auto cmp = memtable_entry::compare(_memtable->_schema);
return _range.end()
@@ -148,46 +144,117 @@ class scanning_reader final : public mutation_reader::impl {
}
_last_reclaim_counter = current_reclaim_counter;
}
-public:
- scanning_reader(schema_ptr s,
+protected:
+ iterator_reader(schema_ptr s,
lw_shared_ptr<memtable> m,
- const query::partition_range& range,
- const query::partition_slice& slice,
- const io_priority_class& pc)
+ const query::partition_range& range)
: _memtable(std::move(m))
, _schema(std::move(s))
, _range(range)
- , _pc(pc)
- , _slice(slice)
{ }

+ memtable_entry* fetch_next_entry() {
+ update_iterators();
+ if (_i == _end) {
+ return nullptr;
+ } else {
+ memtable_entry& e = *_i;
+ ++_i;
+ _last = e.key();
+ _memtable->upgrade_entry(e);
+ return &e;
+ }
+ }
+
+ logalloc::allocating_section& read_section() {
+ return _memtable->_read_section;
+ }
+
+ lw_shared_ptr<memtable> mtbl() {
+ return _memtable;
+ }
+
+ schema_ptr schema() {
+ return _schema;
+ }
+
+ logalloc::region& region() {
+ return *_memtable;
+ };
+
+ std::experimental::optional<query::partition_range> get_delegate_range() {
+ // We cannot run concurrently with row_cache::update().
+ if (_memtable->is_flushed()) {
+ return _last ? _range.split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : _range;
+ }
+ return {};
+ }
+
+ mutation_reader delegate_reader(const query::partition_range& delegate,
+ const query::partition_slice& slice,
+ const io_priority_class& pc) {
+ auto ret = make_mutation_reader<sstable_range_wrapping_reader>(
+ _memtable->_sstable, _schema, delegate, slice, pc);
+ _memtable = {};
+ _last = {};
+ return ret;
+ }
+};
+
+class scanning_reader final: public iterator_reader {
+ stdx::optional<query::partition_range> _delegate_range;
+ mutation_reader _delegate;
+ const io_priority_class& _pc;
+ const query::partition_slice& _slice;
+public:
+ scanning_reader(schema_ptr s,
+ lw_shared_ptr<memtable> m,
+ const query::partition_range& range,
+ const query::partition_slice& slice,
+ const io_priority_class& pc)
+ : iterator_reader(std::move(s), std::move(m), range)
+ , _pc(pc)
+ , _slice(slice)
+ { }
+
virtual future<streamed_mutation_opt> operator()() override {
if (_delegate_range) {
return _delegate();
}

- // We cannot run concurrently with row_cache::update().
- if (_memtable->is_flushed()) {
- // FIXME: Use cache. See column_family::make_reader().
- _delegate_range = _last ? _range.split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : _range;
- _delegate = make_mutation_reader<sstable_range_wrapping_reader>(
- _memtable->_sstable, _schema, *_delegate_range, _slice, _pc);
- _memtable = {};
- _last = {};
+ // FIXME: Use cache. See column_family::make_reader().
+ _delegate_range = get_delegate_range();
+ if (_delegate_range) {
+ _delegate = delegate_reader(*_delegate_range, _slice, _pc);
return _delegate();
}

- logalloc::reclaim_lock _(*_memtable);
+ logalloc::reclaim_lock _(region());
managed_bytes::linearization_context_guard lcg;
- update_iterators();
- if (_i == _end) {
+ memtable_entry* e = fetch_next_entry();
+ if (!e) {
+ return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
+ } else {
+ return make_ready_future<streamed_mutation_opt>(e->read(mtbl(), schema(), _slice));
+ }
+ }
+};
+
+class flush_reader final : public iterator_reader {
+public:
+ flush_reader(schema_ptr s, lw_shared_ptr<memtable> m)
+ : iterator_reader(std::move(s), std::move(m), query::full_partition_range)
+ {}
+
+ virtual future<streamed_mutation_opt> operator()() override {
+ logalloc::reclaim_lock _(region());
+ managed_bytes::linearization_context_guard lcg;
+ memtable_entry* e = fetch_next_entry();
+ if (!e) {
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
+ } else {
+ return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice));
}
- memtable_entry& e = *_i;
- ++_i;
- _last = e.key();
- _memtable->upgrade_entry(e);
- return make_ready_future<streamed_mutation_opt>(e.read(_memtable, _schema, _slice));
}
};

@@ -219,7 +286,7 @@ memtable::make_reader(schema_ptr s,

mutation_reader
memtable::make_flush_reader(schema_ptr s) {
- return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::no_clustering_key_filtering, default_priority_class());
+ return make_mutation_reader<flush_reader>(std::move(s), shared_from_this());
}

void
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:58:02 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
This is so we can template it without worrying about declaring the
specializations in the .cc file.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 172 +++++++++++++++++++++++++++++++++++++++++++++----
partition_version.cc | 178 ---------------------------------------------------
2 files changed, 161 insertions(+), 189 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 45a3e13..6c57ab0 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -311,23 +311,173 @@ class partition_snapshot_reader : public streamed_mutation::impl {
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
- void refresh_iterators();
- void pop_clustering_row();
+ void refresh_iterators() {
+ _clustering_rows.clear();

- mutation_fragment_opt read_static_row();
- mutation_fragment_opt read_next();
- void do_fill_buffer();
- static tombstone tomb(partition_snapshot& snp);
+ if (!_in_ck_range && _current_ck_range == _ck_range_end) {
+ return;
+ }
+
+ for (auto&& v : _snapshot->versions()) {
+ auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range);
+ auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
+ if (_in_ck_range) {
+ return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp);
+ } else {
+ return v.partition().lower_bound(*_schema, *_current_ck_range);
+ }
+ }();
+
+ if (cr != cr_end) {
+ _clustering_rows.emplace_back(rows_position { cr, cr_end });
+ }
+ }
+
+ _in_ck_range = true;
+ boost::range::make_heap(_clustering_rows, heap_compare(_cmp));
+ }
+
+ void pop_clustering_row() {
+ auto& current = _clustering_rows.back();
+ current._position = std::next(current._position);
+ if (current._position == current._end) {
+ _clustering_rows.pop_back();
+ } else {
+ boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
+ }
+ }
+
+ mutation_fragment_opt read_static_row() {
+ _last_entry = position_in_partition(position_in_partition::static_row_tag_t());
+ mutation_fragment_opt sr;
+ for (auto&& v : _snapshot->versions()) {
+ if (!v.partition().static_row().empty()) {
+ if (!sr) {
+ sr = mutation_fragment(static_row(v.partition().static_row()));
+ } else {
+ sr->as_static_row().apply(*_schema, v.partition().static_row());
+ }
+ }
+ }
+ return sr;
+ }
+
+ mutation_fragment_opt read_next() {
+ if (!_clustering_rows.empty()) {
+ auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
+ if (mf) {
+ return mf;
+ }
+
+ boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
+ clustering_row result = *_clustering_rows.back()._position;
+ pop_clustering_row();
+ while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) {
+ boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
+ auto& current = _clustering_rows.back();
+ result.apply(*_schema, *current._position);
+ pop_clustering_row();
+ }
+ _last_entry = result.position();
+ return mutation_fragment(std::move(result));
+ }
+ return _range_tombstones.get_next();
+ }
+
+ void do_fill_buffer() {
+ if (!_last_entry) {
+ auto mfopt = read_static_row();
+ if (mfopt) {
+ _buffer.emplace_back(std::move(*mfopt));
+ }
+ }
+
+ if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
+ refresh_iterators();
+ _reclaim_counter = _lsa_region.reclaim_counter();
+ _version_count = _snapshot->version_count();
+ }
+
+ while (!is_end_of_stream() && !is_buffer_full()) {
+ if (_in_ck_range && _clustering_rows.empty()) {
+ _in_ck_range = false;
+ _current_ck_range = std::next(_current_ck_range);
+ refresh_iterators();
+ continue;
+ }
+
+ auto mfopt = read_next();
+ if (mfopt) {
+ _buffer.emplace_back(std::move(*mfopt));
+ } else {
+ _end_of_stream = true;
+ }
+ }
+ }
+
+ static tombstone tomb(partition_snapshot& snp) {
+ tombstone t;
+ for (auto& v : snp.versions()) {
+ t.apply(v.partition().partition_tombstone());
+ }
+ return t;
+ }
public:
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
- boost::any pointer_to_container);
- ~partition_snapshot_reader();
- virtual future<> fill_buffer() override;
+ boost::any pointer_to_container)
+ : streamed_mutation::impl(s, std::move(dk), tomb(*snp))
+ , _container_guard(std::move(pointer_to_container))
+ , _ck_ranges(std::move(crr))
+ , _current_ck_range(_ck_ranges.begin())
+ , _ck_range_end(_ck_ranges.end())
+ , _cmp(*s)
+ , _eq(*s)
+ , _snapshot(snp)
+ , _range_tombstones(*s)
+ , _lsa_region(region)
+ , _read_section(read_section) {
+ for (auto&& v : _snapshot->versions()) {
+ _range_tombstones.apply(v.partition().row_tombstones());
+ }
+ do_fill_buffer();
+ }
+
+ ~partition_snapshot_reader() {
+ if (!_snapshot.owned()) {
+ return;
+ }
+ // If no one else is using this particular snapshot try to merge partition
+ // versions.
+ with_allocator(_lsa_region.allocator(), [this] {
+ return with_linearized_managed_bytes([this] {
+ try {
+ _read_section(_lsa_region, [this] {
+ _snapshot->merge_partition_versions();
+ _snapshot = {};
+ });
+ } catch (...) { }
+ });
+ });
+ }
+
+ virtual future<> fill_buffer() override {
+ return _read_section(_lsa_region, [&] {
+ return with_linearized_managed_bytes([&] {
+ do_fill_buffer();
+ return make_ready_future<>();
+ });
+ });
+ }
};

-streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container);
+ logalloc::allocating_section& read_section, boost::any pointer_to_container)
+{
+ return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container), no_accounter);
+}
diff --git a/partition_version.cc b/partition_version.cc
index 0a92a14..e61acde 100644
--- a/partition_version.cc
+++ b/partition_version.cc
@@ -291,181 +291,3 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(schema_ptr entry_schema)
return snp;
}
}
-
-partition_snapshot_reader::partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
- lw_shared_ptr<partition_snapshot> snp,
- query::clustering_key_filter_ranges crr, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
- : streamed_mutation::impl(s, std::move(dk), tomb(*snp))
- , _container_guard(std::move(pointer_to_container))
- , _ck_ranges(std::move(crr))
- , _current_ck_range(_ck_ranges.begin())
- , _ck_range_end(_ck_ranges.end())
- , _cmp(*s)
- , _eq(*s)
- , _snapshot(snp)
- , _range_tombstones(*s)
- , _lsa_region(region)
- , _read_section(read_section)
-{
- for (auto&& v : _snapshot->versions()) {
- _range_tombstones.apply(v.partition().row_tombstones());
- }
- do_fill_buffer();
-}
-
-partition_snapshot_reader::~partition_snapshot_reader()
-{
- if (!_snapshot.owned()) {
- return;
- }
- // If no one else is using this particular snapshot try to merge partition
- // versions.
- with_allocator(_lsa_region.allocator(), [this] {
- return with_linearized_managed_bytes([this] {
- try {
- _read_section(_lsa_region, [this] {
- _snapshot->merge_partition_versions();
- _snapshot = {};
- });
- } catch (...) { }
- });
- });
-}
-
-tombstone partition_snapshot_reader::tomb(partition_snapshot& snp)
-{
- tombstone t;
- for (auto& v : snp.versions()) {
- t.apply(v.partition().partition_tombstone());
- }
- return t;
-}
-
-mutation_fragment_opt partition_snapshot_reader::read_static_row()
-{
- _last_entry = position_in_partition(position_in_partition::static_row_tag_t());
- mutation_fragment_opt sr;
- for (auto&& v : _snapshot->versions()) {
- if (!v.partition().static_row().empty()) {
- if (!sr) {
- sr = mutation_fragment(static_row(v.partition().static_row()));
- } else {
- sr->as_static_row().apply(*_schema, v.partition().static_row());
- }
- }
- }
- return sr;
-}
-
-void partition_snapshot_reader::refresh_iterators()
-{
- _clustering_rows.clear();
-
- if (!_in_ck_range && _current_ck_range == _ck_range_end) {
- return;
- }
-
- for (auto&& v : _snapshot->versions()) {
- auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range);
- auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
- if (_in_ck_range) {
- return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp);
- } else {
- return v.partition().lower_bound(*_schema, *_current_ck_range);
- }
- }();
-
- if (cr != cr_end) {
- _clustering_rows.emplace_back(rows_position { cr, cr_end });
- }
- }
-
- _in_ck_range = true;
- boost::range::make_heap(_clustering_rows, heap_compare(_cmp));
-}
-
-void partition_snapshot_reader::pop_clustering_row()
-{
- auto& current = _clustering_rows.back();
- current._position = std::next(current._position);
- if (current._position == current._end) {
- _clustering_rows.pop_back();
- } else {
- boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
- }
-}
-
-mutation_fragment_opt partition_snapshot_reader::read_next()
-{
- if (!_clustering_rows.empty()) {
- auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
- if (mf) {
- return mf;
- }
-
- boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
- clustering_row result = *_clustering_rows.back()._position;
- pop_clustering_row();
- while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) {
- boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
- auto& current = _clustering_rows.back();
- result.apply(*_schema, *current._position);
- pop_clustering_row();
- }
- _last_entry = result.position();
- return mutation_fragment(std::move(result));
- }
- return _range_tombstones.get_next();
-}
-
-void partition_snapshot_reader::do_fill_buffer()
-{
- if (!_last_entry) {
- auto mfopt = read_static_row();
- if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
- }
- }
-
- if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
- refresh_iterators();
- _reclaim_counter = _lsa_region.reclaim_counter();
- _version_count = _snapshot->version_count();
- }
-
- while (!is_end_of_stream() && !is_buffer_full()) {
- if (_in_ck_range && _clustering_rows.empty()) {
- _in_ck_range = false;
- _current_ck_range = std::next(_current_ck_range);
- refresh_iterators();
- continue;
- }
-
- auto mfopt = read_next();
- if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
- } else {
- _end_of_stream = true;
- }
- }
-}
-
-future<> partition_snapshot_reader::fill_buffer()
-{
- return _read_section(_lsa_region, [&] {
- return with_linearized_managed_bytes([&] {
- do_fill_buffer();
- return make_ready_future<>();
- });
- });
-}
-
-streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
- query::clustering_key_filter_ranges crr,
- lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
-{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
- snp, std::move(crr), region, read_section, std::move(pointer_to_container));
-}
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:58:02 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
Currently, code interacting with range_tombstone_streams uses the get_next() API
to convert the next available tombstone in the list to a mutation fragment.

In order to properly account the range tombstone, it is desirable to do it
earlier, since the creation of the mutation fragment may move the data to a
different region.

This patch introduces the peek_next() API, which allows one to obtain a pointer
to the next element in the list, and implements the get_next() API in terms of
it.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 9 +++++----
streamed_mutation.hh | 10 +++++++++-
streamed_mutation.cc | 47 ++++++++++++++++++++++++++++++++---------------
3 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 6c57ab0..da27021 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -364,9 +364,9 @@ class partition_snapshot_reader : public streamed_mutation::impl {

mutation_fragment_opt read_next() {
if (!_clustering_rows.empty()) {
- auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
- if (mf) {
- return mf;
+ auto rt = _range_tombstones.peek_next(*_clustering_rows.front()._position);
+ if (rt) {
+ return _range_tombstones.get_next(rt);
}

boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
@@ -381,7 +381,8 @@ class partition_snapshot_reader : public streamed_mutation::impl {
_last_entry = result.position();
return mutation_fragment(std::move(result));
}
- return _range_tombstones.get_next();
+ auto rt = _range_tombstones.peek_next();
+ return _range_tombstones.get_next(rt);
}

void do_fill_buffer() {
diff --git a/streamed_mutation.hh b/streamed_mutation.hh
index adb8b17..502e454 100644
--- a/streamed_mutation.hh
+++ b/streamed_mutation.hh
@@ -550,12 +550,20 @@ class range_tombstone_stream {
range_tombstone_list _list;
bool _inside_range_tombstone = false;
private:
- mutation_fragment_opt do_get_next();
+ mutation_fragment_opt do_get_next(range_tombstone& rt);
public:
range_tombstone_stream(const schema& s) : _schema(s), _cmp(s), _list(s) { }
+
+ range_tombstone* peek_next(const rows_entry&);
+ range_tombstone* peek_next(const mutation_fragment&);
+ range_tombstone* peek_next();
+
mutation_fragment_opt get_next(const rows_entry&);
mutation_fragment_opt get_next(const mutation_fragment&);
mutation_fragment_opt get_next();
+ mutation_fragment_opt get_next(range_tombstone* rt) {
+ return rt ? do_get_next(*rt) : mutation_fragment_opt();
+ }

void apply(range_tombstone&& rt) {
_list.apply(_schema, std::move(rt));
diff --git a/streamed_mutation.cc b/streamed_mutation.cc
index 0149b39..eceb01e 100644
--- a/streamed_mutation.cc
+++ b/streamed_mutation.cc
@@ -348,37 +348,54 @@ streamed_mutation merge_mutations(std::vector<streamed_mutation> ms)
return make_streamed_mutation<mutation_merger>(ms.back().schema(), ms.back().decorated_key(), std::move(ms));
}

-mutation_fragment_opt range_tombstone_stream::do_get_next()
+range_tombstone* range_tombstone_stream::peek_next() {
+ if (!_list.empty()) {
+ return &*_list.tombstones().begin();
+ }
+ return nullptr;
+}
+
+range_tombstone* range_tombstone_stream::peek_next(const rows_entry& re) {
+ if (_list.empty() || _cmp(re, _list.begin()->start_bound())) {
+ return nullptr;
+ } else {
+ return &*_list.tombstones().begin();
+ }
+}
+
+range_tombstone* range_tombstone_stream::peek_next(const mutation_fragment& mf) {
+ if (_list.empty() || _cmp(mf, *_list.begin())) {
+ return nullptr;
+ } else {
+ return &*_list.tombstones().begin();
+ }
+}
+
+mutation_fragment_opt range_tombstone_stream::do_get_next(range_tombstone& rt)
{
- auto& rt = *_list.tombstones().begin();
+ auto it = _list.tombstones().iterator_to(rt);
+ _list.tombstones().erase(it);
auto mf = mutation_fragment(std::move(rt));
- _list.tombstones().erase(_list.begin());
current_deleter<range_tombstone>()(&rt);
return mf;
}

mutation_fragment_opt range_tombstone_stream::get_next(const rows_entry& re)
{
- if (!_list.empty()) {
- return !_cmp(re, _list.begin()->start_bound()) ? do_get_next() : mutation_fragment_opt();
- }
- return { };
+ auto rt = peek_next(re);
+ return rt ? do_get_next(*rt) : mutation_fragment_opt();
}

mutation_fragment_opt range_tombstone_stream::get_next(const mutation_fragment& mf)
{
- if (!_list.empty()) {
- return !_cmp(mf, *_list.begin()) ? do_get_next() : mutation_fragment_opt();
- }
- return { };
+ auto rt = peek_next(mf);
+ return rt ? do_get_next(*rt) : mutation_fragment_opt();
}

mutation_fragment_opt range_tombstone_stream::get_next()
{
- if (!_list.empty()) {
- return do_get_next();
- }
- return { };
+ auto rt = peek_next();
+ return rt ? do_get_next(*rt) : mutation_fragment_opt();
}

streamed_mutation reverse_streamed_mutation(streamed_mutation sm) {
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:58:03 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
By default, we don't do any accounting. By specializing this class and providing
an accounter class, we can account how much memory are we reading as we read
through the elements.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 33 ++++++++++++++++++++++++++++++---
partition_version.cc | 2 ++
2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index da27021..6ab71ea 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -271,6 +271,14 @@ inline partition_version_ref& partition_snapshot::version()
}
}

+struct partition_snapshot_reader_dummy_accounter {
+ void account_component(const rows_entry& e) {}
+ void account_component(const range_tombstone& e) {}
+ void account_component(const row& e) {}
+};
+extern partition_snapshot_reader_dummy_accounter no_accounter;
+
+template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
class partition_snapshot_reader : public streamed_mutation::impl {
struct rows_position {
mutation_partition::rows_type::const_iterator _position;
@@ -307,6 +315,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {

logalloc::region& _lsa_region;
logalloc::allocating_section& _read_section;
+ MemoryAccounter& _mem_accounter;

uint64_t _reclaim_counter;
unsigned _version_count = 0;
@@ -329,6 +338,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {
}();

if (cr != cr_end) {
+ _mem_accounter.account_component(*cr);
_clustering_rows.emplace_back(rows_position { cr, cr_end });
}
}
@@ -352,6 +362,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {
mutation_fragment_opt sr;
for (auto&& v : _snapshot->versions()) {
if (!v.partition().static_row().empty()) {
+ _mem_accounter.account_component(v.partition().static_row());
if (!sr) {
sr = mutation_fragment(static_row(v.partition().static_row()));
} else {
@@ -366,6 +377,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {
if (!_clustering_rows.empty()) {
auto rt = _range_tombstones.peek_next(*_clustering_rows.front()._position);
if (rt) {
+ _mem_accounter.account_component(*rt);
return _range_tombstones.get_next(rt);
}

@@ -382,6 +394,9 @@ class partition_snapshot_reader : public streamed_mutation::impl {
return mutation_fragment(std::move(result));
}
auto rt = _range_tombstones.peek_next();
+ if (rt) {
+ _mem_accounter.account_component(*rt);
+ }
return _range_tombstones.get_next(rt);
}

@@ -427,7 +442,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
- boost::any pointer_to_container)
+ boost::any pointer_to_container, MemoryAccounter& acct)
: streamed_mutation::impl(s, std::move(dk), tomb(*snp))
, _container_guard(std::move(pointer_to_container))
, _ck_ranges(std::move(crr))
@@ -438,7 +453,8 @@ class partition_snapshot_reader : public streamed_mutation::impl {
, _snapshot(snp)
, _range_tombstones(*s)
, _lsa_region(region)
- , _read_section(read_section) {
+ , _read_section(read_section)
+ , _mem_accounter(acct) {
for (auto&& v : _snapshot->versions()) {
_range_tombstones.apply(v.partition().row_tombstones());
}
@@ -473,12 +489,23 @@ class partition_snapshot_reader : public streamed_mutation::impl {
}
};

+template <typename MemoryAccounter>
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+ query::clustering_key_filter_ranges crr,
+ lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
+ logalloc::allocating_section& read_section, boost::any pointer_to_container, MemoryAccounter& acct)
+{
+ return make_streamed_mutation<partition_snapshot_reader<MemoryAccounter>>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container), acct);
+}
+
inline streamed_mutation
make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
logalloc::allocating_section& read_section, boost::any pointer_to_container)
{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
+ return make_streamed_mutation<partition_snapshot_reader<>>(s, std::move(dk),
snp, std::move(crr), region, read_section, std::move(pointer_to_container), no_accounter);
}
diff --git a/partition_version.cc b/partition_version.cc
index e61acde..1450645 100644
--- a/partition_version.cc
+++ b/partition_version.cc
@@ -291,3 +291,5 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(schema_ptr entry_schema)
return snp;
}
}
+
+partition_snapshot_reader_dummy_accounter no_accounter;
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 14, 2016, 11:58:04 PM9/14/16
to scylladb-dev@googlegroups.com, Glauber Costa
Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that is that once we reach 50 % occupancy in our dirty
memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
database.cc | 18 +++++++++++++++--
memtable.cc | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/database.cc b/database.cc
index 3252282..fb8e571 100644
--- a/database.cc
+++ b/database.cc
@@ -1548,8 +1548,22 @@ database::database(const db::config& cfg)
// in a different region group. This is because throttled requests are serviced in FIFO order,
// and we don't want system requests to be waiting for a long time behind user requests.
, _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20))
- , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space)
- , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space)
+ // The total space that can be used by memtables is _memtable_total_space, but we will only
+ // allow the region_group to grow to half of that. This is because of virtual_dirty: memtables
+ // can take a long time to flush, and if we are using the maximum amount of memory possible,
+ // then requests will block until we finish flushing at least one memtable.
+ //
+ // We can free memory until the whole memtable is flushed because we need to keep it in memory
+ // until the end, but we can fake freeing memory. When we are done with an element of the
+ // memtable, we will update the region group pretending memory just went down by that amount.
+ //
+ // Because the amount of memory that we pretend to free should be close enough to the actual
+ // memory used by the memtables, that effectively creates two sub-regions inside the dirty
+ // region group, of equal size. In the worst case, we will have _memtable_total_space dirty
+ // bytes used, and half of that already virtually freed.
+ , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space / 2)
+ // The same goes for streaming in respect to virtual dirty.
+ , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space / 2)
, _version(empty_version)
, _enable_incremental_backups(cfg.incremental_backups())
{
diff --git a/memtable.cc b/memtable.cc
index 3a1743b..7581001 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -20,6 +20,7 @@
*/

#include "memtable.hh"
+#include "database.hh"
#include "frozen_mutation.hh"
#include "sstable_mutation_readers.hh"

@@ -240,11 +241,71 @@ class scanning_reader final: public iterator_reader {
}
};

+class flush_memory_accounter {
+ uint64_t _bytes_read = 0;
+ logalloc::region& _region;
+
+ template <typename Func>
+ void update_bytes_read(Func&& f) {
+ // that's so we can catch f by reference.
+ static_assert(!is_future<std::result_of_t<Func()>>::value, "future-returning functions are not permitted.");
+ auto delta = with_allocator(_region.allocator(), [&f] () -> uint64_t {
+ return f();
+ });
+ _bytes_read += delta;
+ dirty_memory_manager::from_region_group(_region.group()).account_potentially_cleaned_up_memory(delta);
+ }
+public:
+ explicit flush_memory_accounter(logalloc::region& region)
+ : _region(region)
+ {}
+
+ ~flush_memory_accounter() {
+ assert(_bytes_read <= _region.occupancy().used_space());
+ dirty_memory_manager::from_region_group(_region.group()).revert_potentially_cleaned_up_memory(_bytes_read);
+ }
+
+
+ void account_component(const range_tombstone& rt) {
+ update_bytes_read([&rt] {
+ return rt.memory_usage();
+ });
+ }
+
+ void account_component(const row& sr) {
+ update_bytes_read([&sr] {
+ return sr.memory_usage();
+ });
+ }
+
+ void account_component(const rows_entry& e) {
+ update_bytes_read([&e] {
+ return current_allocator().object_memory_size_in_allocator(&const_cast<rows_entry&>(e)) +
+ e.key().memory_usage() +
+ e.row().cells().memory_usage();
+ });
+ }
+
+ void account_component(memtable_entry& e) {
+ update_bytes_read([this, &e] {
+ return current_allocator().object_memory_size_in_allocator(&e) +
+ current_allocator().object_memory_size_in_allocator(&*(partition_snapshot(e.schema(), &(e.partition())).version())) +
+ e.key().key().memory_usage();
+ });
+ }
+};
+
class flush_reader final : public iterator_reader {
+ flush_memory_accounter _flushed_memory;
public:
flush_reader(schema_ptr s, lw_shared_ptr<memtable> m)
: iterator_reader(std::move(s), std::move(m), query::full_partition_range)
+ , _flushed_memory(region())
{}
+ flush_reader(const flush_reader&) = delete;
+ flush_reader(flush_reader&&) = delete;
+ flush_reader& operator=(flush_reader&&) = delete;
+ flush_reader& operator=(const flush_reader&) = delete;

virtual future<streamed_mutation_opt> operator()() override {
logalloc::reclaim_lock _(region());
@@ -253,7 +314,11 @@ class flush_reader final : public iterator_reader {
if (!e) {
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
} else {
- return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice));
+ auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), query::full_slice, e->key().key());
+ auto snp = e->partition().read(schema());
+ auto mpsr = make_partition_snapshot_reader(schema(), e->key(), std::move(cr), snp, region(), read_section(), mtbl(), _flushed_memory);
+ _flushed_memory.account_component(*e);
+ return make_ready_future<streamed_mutation_opt>(std::move(mpsr));
}
}
};
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 15, 2016, 12:04:38 AM9/15/16
to scylladb-dev, Glauber Costa
I would like to share some graphs generated without (first-*) and with (second-*) this patchset applied.

In both cases, the server has 2 shards with 2 GB RAM each, and a loader writes partitions with a single column
that is 64KB in size. The contents should be self-explanatory, but please do let me know if anything is not clear.

The most interesting graph is the memory graph, where I am tracking virtual dirty (total, average, and max per shard - since all it takes is one shard hitting the max for us to time out). We will see in that graph that where we used to hit the wall and have requests waiting for multiple seconds - causing the timeouts - now we will have more requests waiting for less than tens of milliseconds, resulting in a smoother behavior.
second-memtable-io.png
first-compaction-io.png
first-running-compactions.png
first-timeouts.png
first-requests.png
second-compaction-io.png
second-memory.png
second-compaction.png
second-timeout.png
second-requests.png.png
second-requests.png
first-memory.png
first-memtable-io.png

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 15, 2016, 12:16:07 AM9/15/16
to scylladb-dev, Glauber Costa
One note about those graphs: there are two graphs for "second-requests". In one of them - with timing consistent with the others, I have capped this to an initial period comparable to the first run.

The other shows the run going on until my disk finally screams out of space. The disk gets progressively slower as it gets fuller. which I believe is normal for SSDs. I have been consistently running fstrim after I get my disk full, otherwise the next runs also get slower as well, so that seems to me like a normal thing. Still, do let me know if you think this is somehow unexpected.

The disk graph shows the overall bandwidth getting lower, and both the memtable and compaction classes gets slower without any other competition (sorry I forgot to add those graphs before, adding here). Still I see no timeouts.
second-classes.png
second-disk.png

Avi Kivity

<avi@scylladb.com>
unread,
Sep 15, 2016, 3:24:18 AM9/15/16
to Glauber Costa, scylladb-dev
It is normal, but the effect is unexpected for sequential 128kB writes.  Maybe your disk has really small buffers.  I would expect it to gather a full erase block worth of new data and then erase and overwrite, eliminating any write amplification.

Unfortunately Linux doesn't support TRIM well enough today.

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev...@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/scylladb-dev/CAD-J%3DzaKjCBCrWX71P%2BVCJJrujorf7jD%3DQDfRjiJ-PyHhkAy9g%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 15, 2016, 3:47:17 AM9/15/16
to Glauber Costa, scylladb-dev@googlegroups.com
I think we could specialize it further to avoid the _delegate business,
which is never needed when flushing (and so we can eliminate the
priority class parameter which is sticking out like a sore thumb).

Avi Kivity

<avi@scylladb.com>
unread,
Sep 15, 2016, 3:50:54 AM9/15/16
to Glauber Costa, scylladb-dev@googlegroups.com
Ah, the next patch does that.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 15, 2016, 4:03:32 AM9/15/16
to Glauber Costa, scylladb-dev@googlegroups.com


On 09/15/2016 06:57 AM, Glauber Costa wrote:
The standard approach is to inherit from MemoryAccounter (Accountant?).
This means you don't need an extern object and the no-op implementation
has zero overhead (not even the extra reference).

The yes-op implementation contains the reference.

>
> uint64_t _reclaim_counter;
> unsigned _version_count = 0;
> @@ -329,6 +338,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {
> }();
>
> if (cr != cr_end) {
> + _mem_accounter.account_component(*cr);

To make this legible (with the standard appraoch I outlined above), we
can write

MemoryAccounter& mem_accounter() { return *this; }

and this line changes to mem_accounter().account_component(*cr);.

> _clustering_rows.emplace_back(rows_position { cr, cr_end });
> }
> }
> @@ -352,6 +362,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {
> mutation_fragment_opt sr;
> for (auto&& v : _snapshot->versions()) {
> if (!v.partition().static_row().empty()) {
> + _mem_accounter.account_component(v.partition().static_row());
> if (!sr) {
> sr = mutation_fragment(static_row(v.partition().static_row()));
> } else {
> @@ -366,6 +377,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {
> if (!_clustering_rows.empty()) {
> auto rt = _range_tombstones.peek_next(*_clustering_rows.front()._position);
> if (rt) {
> + _mem_accounter.account_component(*rt);
> return _range_tombstones.get_next(rt);
> }
>
> @@ -382,6 +394,9 @@ class partition_snapshot_reader : public streamed_mutation::impl {
> return mutation_fragment(std::move(result));
> }
> auto rt = _range_tombstones.peek_next();
> + if (rt) {
> + _mem_accounter.account_component(*rt);
> + }
> return _range_tombstones.get_next(rt);

Maybe a simpler approach is to change get_next() to have the form

template <typename MemoryAccounter = NullAccounter>
mutation_fragment_opt get_next(MemoryAccounter ma = MemoryAccounter());

and now there is no peek_next/get_next combination you have to get
right. Since ma is just a reference passing it is cheap.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 15, 2016, 5:27:42 AM9/15/16
to Glauber Costa, scylladb-dev
On 15 September 2016 at 04:57, Glauber Costa <gla...@scylladb.com> wrote:
Currently, code interacting with range_tombstone_streams uses the get_next() API
to convert the next available tombstone in the list to a mutation fragment.

In order to properly account the range tombstone, it is desirable to do it
earlier, since the creation of the mutation fragment may move the data to a
different region.

When pushed to range_tombstone_stream range tombstones are already copied from LSA region to standard memory. If you need precise measurement you should do so before pushing range tombstones to range_tombstone_stream (or even better, change partition_snaphsot_reader so that it properly streams range tombstones as well, but that's obviously out of scope of this series). So I don't really understand what is the benefit of this patch.
This is definitely not a correct name for this function. The other get_next() overload compare the argument with the next element in the stream and decide which one should go fix. This particular overload is just the other half of the original get_next() after extracting peek_next(). I would suggest a better name but I am not good at this and I am not exactly sure this patch is really necessary (see my comment above).
 
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.

To post to this group, send email to scylla...@googlegroups.com.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 15, 2016, 5:34:44 AM9/15/16
to Glauber Costa, scylladb-dev
On 15 September 2016 at 04:57, Glauber Costa <gla...@scylladb.com> wrote:
Looks like  _mem_accounter.account_component() in pop_clustering_row() is missing.
 
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 15, 2016, 7:47:47 AM9/15/16
to Avi Kivity, scylladb-dev

I have never tested it explicitly in the past but I don't remember the slowdown being that bad. Maybe my disk os getting old ?

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 15, 2016, 7:52:31 AM9/15/16
to Paweł Dziepak, scylladb-dev
Why do I need to account in pop_clustering_rows? That one is just popping what we have emplaced here. If I do that, I would be accounting the clustering row twice, no?

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 15, 2016, 8:03:53 AM9/15/16
to Glauber Costa, scylladb-dev
_clustering_rows keeps only iterators to clustering rows inside LSA region. In this patch you just call account_component() in refresh_iterators() when the iterators in that vector are refreshed because some other action invalidated them or we moved to another clustering row range. Basically, this code accounts clustering row when the reader gets an iterator to it. Now, take a look at pop_clustering_row():

void partition_snapshot_reader::pop_clustering_row()
{
    auto& current = _clustering_rows.back();
    current._position = std::next(current._position);  // <<<<<<<<< be sure to notice this line
    if (current._position == current._end) {
        _clustering_rows.pop_back();
    } else {
        boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
    }
}

The marked line is the place when we increase the iterator and move to the next clustering row in case refresh is not necessary. With the current code such rows will never be accounted for.

That being said, indeed, account_component() in pop_clustering_row() may result in rows being accounted for twice in case when the iterator is later invalidated. I believe that account_component() for clustering rows should only be called in partition_snapshot_reader::read_next() when we create a clustering row from _clustering_rows.back()._position or apply _clustering_rows.back()._position to the current clustering row. In general that would make the clustering row to be accounted for only when it is actually consumed instead of when one of the iterators used by the reader reaches its position which would made the estimate slightly more accurate.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 15, 2016, 8:27:53 AM9/15/16
to Glauber Costa, scylladb-dev
On 15 September 2016 at 04:57, Glauber Costa <gla...@scylladb.com> wrote:
I'm afraid this may fail, see my comment below.
What worries me is all that extra effort put into being as accurate as possible in calculating the size in LSA region of already flushed mutation fragment while MVCC can very easily ruin all estimates.
For example, let's say that in the memtable we have a single partition with a single clustering row which at the moment of flushing has three version each of size X. Flushing reader accounts all these versions so _bytes_read is set to 3*X (+ some additional partition overhead). Then the other readers of that row are finished, versions are squashed and we end up with a single version of size X (because we got only overrides). memtable used size is now X (+ some additional partition overhead) while _bytes_read is 3*X (+ some additional partition overhead).
 
--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:54 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
Available at:

To g...@github.com:glommer/scylla.git virtual-dirty-v3

Description:
============

Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that, is that once we reach 50 % occupancy in our dirty
memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

Results
=======

With this patchset running a load big enough to easily saturate the disk,
(commitlog disabled to highlight the effects of the memtable writer), I am able
to run scylla for many minutes, with timeouts occurring only when I run out of
disk space, whereas without this patch a swarm of timeouts would start merely 2
seconds after the load started - and would never get stable.

In V2, I have sent a set of graphs illustrating the performance of this solution.
This version does not have any significant differences in that front.

For details, please refer to
https://groups.google.com/d/msg/scylladb-dev/iCvD-3Z-QqY/EM8KUh_MAQAJ

Accuracy of the accounting:
---------------------------
It is important for us to be as accurate as possible when accounting freed
memory, since every byte we mark as freed may allow one or more requests to be
executed. I have measured the accuracy of this approach (ignoring padding,
object size for the mutation fragments) to be 99.83 % of used memory in the
test workload I have ran (large, 65k mutations). Memtables under this circumnstance
tend to have a very high occupancy ratio because throttle breeds idle, and idle
breeds compact-on-idle.

Known Issues:
-------------

A lot of time can be elapsed between destroying the flush_reader and actually
releasing memory. The release of memory only happens when the SSTable is fully
sealed, and we have to flush the files, as well as finish writing all SSTable
components at this point. It is still unclear why, but it seems to stem from
the same issue that is plaguing the commitlog.

In any case, while we could mitigate this issue, we can't really fix it. Even
if we decide to optimistically add the free_space() remaining to the virtual
dirty area - allowing more requests to come in, a new flush cannot be initiated
until this one finishes (assuming max concurrency already used). This means
requests will be blocked for this period.


Changes from V2:
================
- The delay-blocked_requests_max counter is removed. It is important to have but
not crucial, and in its current form it lacks important information about peaks.
It will be done later as a quantile.
- Revert back to accounting mutation fragments.
- Fix the problem with accounting snapshots, which can lead to overaccounting. Do
this by reverting back to accounting MFs). We lose a bit of accuracy, but there
seem to be no simple way to properly handle snapshot partitions otherwise.

Changes from V1:
================
- Only used space is accounted for, and we don't account for padding, or free
space in the region. While this slow us down a bit, it is not significant and
it simplifies the code a lot - since we don't have to have extra protections
against compactions
- get rid of virtual functions
- account objects before they are transformed into a mutation fragment -
guaranteeing that they will be in the correct region.

Changes from RFC:
=================
- The accounting was moved to the reader side (mutation_reader), instead of being
done by at the SSTable side.
- Accounting of relevant internal structures (memtable_entry, rows entry) is more
precise.

Glauber Costa (9):
database: export virtual dirty bytes region group
LSA: export information about size of the throttle queue
LSA: export information about object memory footprint
sstables: use special reader for writing a memtable
memtables: split scanning reader in two
LSA: allow a group to query its own region group
move partition_snapshot_reader code to header file
add accounting of memory read to partition_snapshot_reader
database: allow virtual dirty memory management

database.hh | 25 ++++++
memtable.hh | 5 +-
partition_version.hh | 204 ++++++++++++++++++++++++++++++++++++++++---
utils/allocation_strategy.hh | 9 ++
utils/logalloc.hh | 6 ++
database.cc | 37 +++++++-
memtable.cc | 199 +++++++++++++++++++++++++++++++++++------
partition_version.cc | 178 -------------------------------------
sstables/sstables.cc | 2 +-
utils/logalloc.cc | 23 ++++-
10 files changed, 462 insertions(+), 226 deletions(-)

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:56 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
We allocate objects of a certain size, but we use a bit more memory to hold
them. To get a clerer picture about how much memory will an object cost us, we
need help from the allocator. This patch exports an interface that allow users
to query into a specific allocator to get that information.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index c244324..17a81f4 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
+
// Merges another region into this region. The other region is made
// to refer to this region.
// Doesn't invalidate references to allocated objects.
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:56 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
Also add information about for how long has the oldest been sitting in the
queue. This is part of the backpressure work to allow us to throttle incoming
requests if we won't have memory to process them. Shortages can happen in all
sorts of places, and it is useful when designing and testing the solutions to
know where they are, and how bad they are.

This counter is named for consistency after similar counters from transport/.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 4 ++++
database.cc | 9 +++++++++
2 files changed, 13 insertions(+)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index db8ab35..1b104ec 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -318,6 +318,10 @@ class region_group {
_shutdown_requested = true;
return _asynchronous_gate.close();
}
+
+ size_t blocked_requests() {
+ return _blocked_requests.size();
+ }
private:
// Make sure we get a notification and can call release_requests when one of our ancestors that
// used to block us is no longer under memory pressure.
diff --git a/database.cc b/database.cc
index 2b52627..012eaf2 100644
--- a/database.cc
+++ b/database.cc
@@ -1585,6 +1585,15 @@ database::setup_collectd() {
));

_collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("database"
+ , scollectd::per_cpu_plugin_instance
+ , "queue_length", "requests_blocked_memory")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.region_group().blocked_requests();
+ })
+ ));
+
+ _collectd.push_back(

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:56 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
Currently, we export the region group where memtables are placed as dirty bytes.
Upcoming patches will optimistically mark some bytes in this region as free, a
scheme we know as "virtual dirty".

We are still interested in knowing the real state of the dirty region, so we
will keep track of the bytes virtually freed and split the counters in two.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
database.hh | 25 +++++++++++++++++++++++++
database.cc | 10 +++++++++-
2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/database.hh b/database.hh
index 318c1c3..904921f 100644
--- a/database.hh
+++ b/database.hh
@@ -74,6 +74,7 @@
#include <seastar/core/rwlock.hh>
#include <seastar/core/shared_future.hh>
#include "tracing/trace_state.hh"
+#include <boost/intrusive/parent_from_member.hpp>

class frozen_mutation;
class reconcilable_result;
@@ -133,6 +134,7 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
// default values here.
size_t _concurrency;
semaphore _flush_serializer;
+ int64_t _dirty_bytes_released_pre_accounted = 0;

seastar::gate _waiting_flush_gate;
std::vector<shared_memtable> _pending_flushes;
@@ -156,6 +158,11 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
, _region_group(&parent->_region_group, *this)
, _concurrency(concurrency)
, _flush_serializer(concurrency) {}
+
+ static dirty_memory_manager& from_region_group(logalloc::region_group *rg) {
+ return *(boost::intrusive::get_parent_from_member(rg, &dirty_memory_manager::_region_group));
+ }
+
logalloc::region_group& region_group() {
return _region_group;
}
@@ -164,6 +171,24 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
return _region_group;
}

+ void revert_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(delta);
+ _dirty_bytes_released_pre_accounted -= delta;
+ }
+
+ void account_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(-delta);
+ _dirty_bytes_released_pre_accounted += delta;
+ }
+
+ size_t real_dirty_memory() const {
+ return _region_group.memory_used() + _dirty_bytes_released_pre_accounted;
+ }
+
+ size_t virtual_dirty_memory() const {
+ return _region_group.memory_used();
+ }
+
template <typename Func>
future<> serialize_flush(Func&& func) {
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
diff --git a/database.cc b/database.cc
index e8849ea..2b52627 100644
--- a/database.cc
+++ b/database.cc
@@ -1566,7 +1566,15 @@ database::setup_collectd() {
, scollectd::per_cpu_plugin_instance
, "bytes", "dirty")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
- return dirty_memory_region_group().memory_used();
+ return _dirty_memory_manager.real_dirty_memory();
+ })));
+
+ _collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("memory"
+ , scollectd::per_cpu_plugin_instance
+ , "bytes", "virtual_dirty")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.virtual_dirty_memory();
})));

_collectd.push_back(
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:56 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
Right now the special reader doesn't do much, but the idea is that we will
soon replace it will a reader that specializes in flush, and is in turn able
to provide read-side on-flush functionality like virtual dirty.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 3 +++
memtable.cc | 5 +++++
sstables/sstables.cc | 2 +-
3 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/memtable.hh b/memtable.hh
index c6f9960..98fc7eb 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -147,6 +147,9 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
const query::partition_slice& slice = query::full_slice,
const io_priority_class& pc = default_priority_class());

+
+ mutation_reader make_flush_reader(schema_ptr);
+
mutation_source as_data_source();
key_source as_key_source();

diff --git a/memtable.cc b/memtable.cc
index ac6b04a..93f3d02 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -217,6 +217,11 @@ memtable::make_reader(schema_ptr s,
}
}

+mutation_reader
+memtable::make_flush_reader(schema_ptr s) {
+ return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::no_clustering_key_filtering, default_priority_class());
+}
+
void
memtable::update(const db::replay_position& rp) {
if (_replay_position < rp) {
diff --git a/sstables/sstables.cc b/sstables/sstables.cc
index 8a1a679..c1caf33 100644
--- a/sstables/sstables.cc
+++ b/sstables/sstables.cc
@@ -1785,7 +1785,7 @@ void components_writer::consume_end_of_stream() {

future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc, bool leave_unsealed) {
_collector.set_replay_position(mt.replay_position());
- return write_components(mt.make_reader(mt.schema()),
+ return write_components(mt.make_flush_reader(mt.schema()),
mt.partition_count(), mt.schema(), std::numeric_limits<uint64_t>::max(), backup, pc, leave_unsealed);
}

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:58 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 2 ++
utils/logalloc.cc | 8 ++++++++
2 files changed, 10 insertions(+)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index 1b104ec..a38307a 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -566,6 +566,8 @@ class region {

allocation_strategy& allocator();

+ region_group* group();
+
// Merges another region into this region. The other region is left empty.
// Doesn't invalidate references to allocated objects.
void merge(region& other);
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index 17a81f4..ef85815 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:58 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
The code that is common will live in its own reader, the iterator_reader. All
friendly private access to memtable attributes and methods happen through the
iterator reader.

After this patch, we are now left with the scanning_reader - same as always,
but now implemented on top of the iterator_reader, and a flush_reader, which
will be used by SSTable flushes only.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 2 +-
memtable.cc | 127 ++++++++++++++++++++++++++++++++++++++++++++++--------------
2 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/memtable.hh b/memtable.hh
index 98fc7eb..a0a5cbd 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -161,5 +161,5 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
return _replay_position;
}

- friend class scanning_reader;
+ friend class iterator_reader;
};
diff --git a/memtable.cc b/memtable.cc
index 93f3d02..3a1743b 100644
--- a/memtable.cc
+++ b/memtable.cc
+ return nullptr;
+ } else {
+ return {};
+ }
+
virtual future<streamed_mutation_opt> operator()() override {
+ if (!e) {
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:51:59 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
This is so we can template it without worrying about declaring the
specializations in the .cc file.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 172 +++++++++++++++++++++++++++++++++++++++++++++----
partition_version.cc | 178 ---------------------------------------------------
2 files changed, 161 insertions(+), 189 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 45a3e13..6c57ab0 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -311,23 +311,173 @@ class partition_snapshot_reader : public streamed_mutation::impl {
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
- void refresh_iterators();
- void pop_clustering_row();
+ void refresh_iterators() {
+ _clustering_rows.clear();

- mutation_fragment_opt read_static_row();
- mutation_fragment_opt read_next();
- void do_fill_buffer();
- static tombstone tomb(partition_snapshot& snp);
+ if (!_in_ck_range && _current_ck_range == _ck_range_end) {
+ return;
+ }
+
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
+ }
+
+ virtual future<> fill_buffer() override {
+ return _read_section(_lsa_region, [&] {
+ return with_linearized_managed_bytes([&] {
+ do_fill_buffer();
+ return make_ready_future<>();
+ });
+ });
+ }
};

-streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container);
+ logalloc::allocating_section& read_section, boost::any pointer_to_container)
+{
+ return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container), no_accounter);
+}
diff --git a/partition_version.cc b/partition_version.cc
index 0a92a14..e61acde 100644
--- a/partition_version.cc
+++ b/partition_version.cc
@@ -291,181 +291,3 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(schema_ptr entry_schema)
return snp;
}
}
-
-partition_snapshot_reader::partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
- lw_shared_ptr<partition_snapshot> snp,
- query::clustering_key_filter_ranges crr, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
- : streamed_mutation::impl(s, std::move(dk), tomb(*snp))
- , _container_guard(std::move(pointer_to_container))
- , _ck_ranges(std::move(crr))
- , _current_ck_range(_ck_ranges.begin())
- , _ck_range_end(_ck_ranges.end())
- , _cmp(*s)
- , _eq(*s)
- , _snapshot(snp)
- , _range_tombstones(*s)
- , _lsa_region(region)
- , _read_section(read_section)
- if (!_clustering_rows.empty()) {
- auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
- if (mf) {
- return mf;
- lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
-{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
- snp, std::move(crr), region, read_section, std::move(pointer_to_container));
-}
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:52:01 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that is that once we reach 50 % occupancy in our dirty
memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
database.cc | 18 ++++++++++++++--
memtable.cc | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 86 insertions(+), 3 deletions(-)

diff --git a/database.cc b/database.cc
index 012eaf2..07224d3 100644
--- a/database.cc
+++ b/database.cc
@@ -1548,8 +1548,22 @@ database::database(const db::config& cfg)
// in a different region group. This is because throttled requests are serviced in FIFO order,
// and we don't want system requests to be waiting for a long time behind user requests.
, _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20))
- , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space)
- , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space)
+ // The total space that can be used by memtables is _memtable_total_space, but we will only
+ // allow the region_group to grow to half of that. This is because of virtual_dirty: memtables
+ // can take a long time to flush, and if we are using the maximum amount of memory possible,
+ // then requests will block until we finish flushing at least one memtable.
+ //
+ // We can free memory until the whole memtable is flushed because we need to keep it in memory
+ // until the end, but we can fake freeing memory. When we are done with an element of the
+ // memtable, we will update the region group pretending memory just went down by that amount.
+ //
+ // Because the amount of memory that we pretend to free should be close enough to the actual
+ // memory used by the memtables, that effectively creates two sub-regions inside the dirty
+ // region group, of equal size. In the worst case, we will have _memtable_total_space dirty
+ // bytes used, and half of that already virtually freed.
+ , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space / 2)
+ // The same goes for streaming in respect to virtual dirty.
+ , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space / 2)
, _version(empty_version)
, _enable_incremental_backups(cfg.incremental_backups())
{
diff --git a/memtable.cc b/memtable.cc
index 3a1743b..33bf39d 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -20,6 +20,7 @@
*/

#include "memtable.hh"
+#include "database.hh"
#include "frozen_mutation.hh"
#include "sstable_mutation_readers.hh"

@@ -240,11 +241,75 @@ class scanning_reader final: public iterator_reader {
}
};

+class flush_memory_accounter {
+ uint64_t _bytes_read = 0;
+ logalloc::region& _region;
+
+public:
+ void update_bytes_read(uint64_t delta) {
+ _bytes_read += delta;
+ dirty_memory_manager::from_region_group(_region.group()).account_potentially_cleaned_up_memory(delta);
+ }
+
+ explicit flush_memory_accounter(logalloc::region& region)
+ : _region(region)
+ {}
+
+ ~flush_memory_accounter() {
+ assert(_bytes_read <= _region.occupancy().used_space());
+ dirty_memory_manager::from_region_group(_region.group()).revert_potentially_cleaned_up_memory(_bytes_read);
+ }
+ void account_component(memtable_entry& e) {
+ auto delta = with_allocator(_region.allocator(), [&e] () -> uint64_t {
+ return current_allocator().object_memory_size_in_allocator(&e) +
+ current_allocator().object_memory_size_in_allocator(&*(partition_snapshot(e.schema(), &(e.partition())).version())) +
+ e.key().key().memory_usage();
+ });
+ update_bytes_read(delta);
+ }
+};
+
+class partition_snapshot_accounter {
+ flush_memory_accounter& _accounter;
+public:
+ partition_snapshot_accounter(flush_memory_accounter& acct): _accounter(acct) {}
+
+ // We will be passed mutation fragments here, and they are allocated using the standard
+ // allocator. So we can't compute the size in memtable precisely. However, precise accounting is
+ // hard anyway, since we may be holding multiple snapshots of the partitions, and the
+ // partition_snapshot_reader may compose them. In doing so, we move memory to the standard
+ // allocation. As long as our size read here is lesser or equal to the size in the memtables, we
+ // are safe, and worst case we will allow a bit fewer requests in.
+ void operator()(const range_tombstone& rt) {
+ _accounter.update_bytes_read(rt.memory_usage());
+ }
+
+ void operator()(const static_row& sr) {
+ _accounter.update_bytes_read(sr.memory_usage());
+ }
+
+ void operator()(const clustering_row& cr) {
+ // Every clustering row is stored in a rows_entry object, and that has some significant
+ // overhead - so add it here. We will be a bit short on our estimate because we can't know
+ // what is the size in the allocator for this rows_entry object: we may have many snapshots,
+ // and we don't know which one(s) contributed to the generation of this mutation fragment.
+ //
+ // We will add the size of the struct here, and that should be good enough.
+ _accounter.update_bytes_read(sizeof(rows_entry) + cr.memory_usage());
+ }
+};
+
class flush_reader final : public iterator_reader {
+ flush_memory_accounter _flushed_memory;
public:
flush_reader(schema_ptr s, lw_shared_ptr<memtable> m)
: iterator_reader(std::move(s), std::move(m), query::full_partition_range)
+ , _flushed_memory(region())
{}
+ flush_reader(const flush_reader&) = delete;
+ flush_reader(flush_reader&&) = delete;
+ flush_reader& operator=(flush_reader&&) = delete;
+ flush_reader& operator=(const flush_reader&) = delete;

virtual future<streamed_mutation_opt> operator()() override {
logalloc::reclaim_lock _(region());
@@ -253,7 +318,11 @@ class flush_reader final : public iterator_reader {
if (!e) {
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
} else {
- return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice));
+ auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), query::full_slice, e->key().key());
+ auto snp = e->partition().read(schema());
+ auto mpsr = make_partition_snapshot_reader<partition_snapshot_accounter>(schema(), e->key(), std::move(cr), snp, region(), read_section(), mtbl(), _flushed_memory);

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 19, 2016, 9:52:01 AM9/19/16
to scylladb-dev@googlegroups.com, Glauber Costa
By default, we don't do any accounting. By specializing this class and providing
an accounter class, we can account how much memory are we reading as we read
through the elements.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 42 ++++++++++++++++++++++++++++++++++++------
1 file changed, 36 insertions(+), 6 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 6c57ab0..0bc5258 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -271,7 +271,15 @@ inline partition_version_ref& partition_snapshot::version()
}
}

-class partition_snapshot_reader : public streamed_mutation::impl {
+struct partition_snapshot_reader_dummy_accounter {
+ void operator()(const clustering_row& cr) {}
+ void operator()(const static_row& sr) {}
+ void operator()(const range_tombstone& rt) {}
+};
+extern partition_snapshot_reader_dummy_accounter no_accounter;
+
+template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
+class partition_snapshot_reader : public streamed_mutation::impl, public MemoryAccounter {
struct rows_position {
mutation_partition::rows_type::const_iterator _position;
mutation_partition::rows_type::const_iterator _end;
@@ -308,6 +316,10 @@ class partition_snapshot_reader : public streamed_mutation::impl {
logalloc::region& _lsa_region;
logalloc::allocating_section& _read_section;

+ MemoryAccounter& mem_accounter() {
+ return *this;
+ }
+
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
@@ -384,11 +396,16 @@ class partition_snapshot_reader : public streamed_mutation::impl {
return _range_tombstones.get_next();
}

+ void emplace_mutation_fragment(mutation_fragment&& mfopt) {
+ mfopt.visit(mem_accounter());
+ _buffer.emplace_back(std::move(mfopt));
+ }
+
void do_fill_buffer() {
if (!_last_entry) {
auto mfopt = read_static_row();
if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
+ emplace_mutation_fragment(std::move(*mfopt));
}
}

@@ -408,7 +425,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {

auto mfopt = read_next();
if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
+ emplace_mutation_fragment(std::move(*mfopt));
} else {
_end_of_stream = true;
}
@@ -423,11 +440,13 @@ class partition_snapshot_reader : public streamed_mutation::impl {
return t;
}
public:
+ template <typename... Args>
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
- boost::any pointer_to_container)
+ boost::any pointer_to_container, Args&&... args)
: streamed_mutation::impl(s, std::move(dk), tomb(*snp))
+ , MemoryAccounter(std::forward<Args>(args)...)
, _container_guard(std::move(pointer_to_container))
, _ck_ranges(std::move(crr))
, _current_ck_range(_ck_ranges.begin())
@@ -472,12 +491,23 @@ class partition_snapshot_reader : public streamed_mutation::impl {
}
};

+template <typename MemoryAccounter, typename... Args>
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+ query::clustering_key_filter_ranges crr,
+ lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
+ logalloc::allocating_section& read_section, boost::any pointer_to_container, Args&&... args)
+{
+ return make_streamed_mutation<partition_snapshot_reader<MemoryAccounter>>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container), std::forward<Args>(args)...);
+}
+
inline streamed_mutation
make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
logalloc::allocating_section& read_section, boost::any pointer_to_container)
{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
- snp, std::move(crr), region, read_section, std::move(pointer_to_container), no_accounter);
+ return make_streamed_mutation<partition_snapshot_reader<>>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container));
}
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 11:26:13 AM9/23/16
to scylladb-dev, Glauber Costa
ping

Raphael Carvalho

<raphaelsc@scylladb.com>
unread,
Sep 23, 2016, 3:19:56 PM9/23/16
to Glauber Costa, scylladb-dev
On Mon, Sep 19, 2016 at 10:51 AM, Glauber Costa <gla...@scylladb.com> wrote:
Available at:

        To g...@github.com:glommer/scylla.git    virtual-dirty-v3

Description:
============

Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

This indicates that scylla frees memory from memtable as write proceeds, right? What if sstable write fails? example: out of disk space... would we able to recover the mutations written to partial sstable and freed from memory?
 

--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Raphael Carvalho

<raphaelsc@scylladb.com>
unread,
Sep 23, 2016, 3:21:02 PM9/23/16
to Glauber Costa, scylladb-dev
On Fri, Sep 23, 2016 at 4:19 PM, Raphael Carvalho <raph...@scylladb.com> wrote:


On Mon, Sep 19, 2016 at 10:51 AM, Glauber Costa <gla...@scylladb.com> wrote:
Available at:

        To g...@github.com:glommer/scylla.git    virtual-dirty-v3

Description:
============

Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

This indicates that scylla frees memory from memtable as write proceeds, right? What if sstable write fails? example: out of disk space... would we able to recover the mutations written to partial sstable and freed from memory?

Forgot about commitlog, sorry for the noise.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:22:01 PM9/23/16
to Raphael Carvalho, scylladb-dev
No, we don't free memory. The whole point of virtual dirty is that the
memory is not freed, we just pretend it is.
>> email to scylladb-dev...@googlegroups.com.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:24:30 PM9/23/16
to Glauber Costa, Raphael Carvalho, scylladb-dev
However, if we fail, we undo our pretensions.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:26:12 PM9/23/16
to Avi Kivity, Raphael Carvalho, scylladb-dev
Correct. When the flush writer is destroyed, it RAII-fully undo
everything it has done.

That is the case with failures, and with successful completion as
well: in the latter case we will be soon
freeing that memory for real.

>

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:26:41 PM9/23/16
to Glauber Costa, scylladb-dev@googlegroups.com
Does the cfq fix (or switching schedulers) remove the remaining timeouts?

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:27:44 PM9/23/16
to Avi Kivity, scylladb-dev
I haven't yet tested. Give me a second.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:29:41 PM9/23/16
to Glauber Costa, scylladb-dev@googlegroups.com, Paweł Dziepak

Looks good to me. Paweł, you had some comments, were they addressed?

On 09/19/2016 04:51 PM, Glauber Costa wrote:

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:34:03 PM9/23/16
to Glauber Costa, Raphael Carvalho, scylladb-dev
We should move the accounter to row_cache::update(), so this operation
and the real free can happen atomically. This will prevent a window
where we have incorrect accounting. If we real-free before we undo the
accounting, we have a window where too much memory is freed, leading to
to a burst of allocations which can later result in being blocked, and
if we do it afterwards, we block immediately.

Can be done as a follow-up. Note that row_cache::update() splits
freeing into multiple atomic sections so the accounter will have to
follow suit.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:34:40 PM9/23/16
to Avi Kivity, scylladb-dev
But just note that even if they go away - and I think they will
because they were indeed
very likely a result of some flushes taking too long from what I have
seen, the problem
still *theoretically* exists.

It's just a very small window of a couple of ms that shouldn't be a
pratical issue - if the
elevator is fixed. I think it doesn't justify the complexity.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:35:20 PM9/23/16
to Avi Kivity, Raphael Carvalho, scylladb-dev
But we not always have a cache. Streaming doesn't use the cache, and
it's possible,
although not common, to run with the cache disabled.

That's one of the reasons I haven't done anything like it.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:39:47 PM9/23/16
to Avi Kivity, scylladb-dev
On Fri, Sep 23, 2016 at 3:27 PM, Glauber Costa <gla...@scylladb.com> wrote:
Yes, it does.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:40:35 PM9/23/16
to Glauber Costa, Raphael Carvalho, scylladb-dev
In that case, we don't move it anywhere.

It works out naturally. Pass the object to update_cache(). If the call
happens, update_cache will update the allocation counters as it moves
data from the memtable to cache. If the call doesn't happen (or throws
in the middle), the destructor will take care of things.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:41:27 PM9/23/16
to Glauber Costa, scylladb-dev
Excellent.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:43:18 PM9/23/16
to Glauber Costa, scylladb-dev
We could hack around it by leaving some safety buffer. If we have M
bytes for dirty memory, we can trigger a flush when real-dirty exceeds
(M - R) / 2, where R is some reserve in the few megabyte range (= few ms
even if the shard flushes at 1 GB/s).

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:45:32 PM9/23/16
to Avi Kivity, Raphael Carvalho, scylladb-dev
Sure, but that's only part of the problem.
Another part - and more important - is that if we are already at the
memtable limit, while the flush doesn't finish we don't virtually free
more memory.

Again, there are ways to mitigate this. One of them is virtually
accounting for the free_space at this point. But that involves
disabling compaction
in the region from that point on.

I have actually tried it, but it didn't fix the issues. Nor could it,
because I didn't know at the time but I do now, that some flushes (and
open, which we do a lot in a memtable flush) could take 20s to
complete. Nothing I could have done could have fixed that.

Now that I know the origin of this, I am less concerned. We can
mitigate this issue better in many other places (like you seem to have
realized in your following e-mail that popped up here)

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:47:35 PM9/23/16
to Avi Kivity, scylladb-dev
That's a separate issue, and I will certainly do that in my follow up
patches to remove the max_memtable_size and have scylla
autotune it.

But again, at some point we'll be at the limit, whichever limit we set.
We had 20s+ io thread calls, that's quite hard to fight that from our side...

Now the system is working really, really well.

>

Avi Kivity

<avi@scylladb.com>
unread,
Sep 23, 2016, 3:49:33 PM9/23/16
to Glauber Costa, scylladb-dev
I mean, when the hardware and kernel are working properly. Nothing can
prevent a timeout if they aren't.

We should allow a reasonable time for the flush and account for it, as
well as for the time it takes to real-free the memory.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 23, 2016, 3:52:38 PM9/23/16
to Avi Kivity, scylladb-dev
If all goes well those should take milliseconds. But the flush time matters less
if we are still able to keep requests flowing in that mean time.

And the way I intend to do this is by starting the flush of a new
memtable not when
we finish the current flush, but when we are close to the end of the
current one.

If we always have a memtable feeding virtual dirty to the system, even
if one of them
takes a little bit more time to flush this shouldn't be too much of an issue.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 27, 2016, 8:15:59 AM9/27/16
to Glauber Costa, scylladb-dev
I know that this is a preexisting problem and the solution doesn't belong to this series but if the range is (-inf, _last] then split_after() would produce (_last, _last] which is wrap-around and trying to create reader for such range won't end well.
 
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.

To post to this group, send email to scylla...@googlegroups.com.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 27, 2016, 8:23:31 AM9/27/16
to Glauber Costa, scylladb-dev
On 19 September 2016 at 14:51, Glauber Costa <gla...@scylladb.com> wrote:
This is so we can template it without worrying about declaring the
specializations in the .cc file.


This is not a good thing to do from compilation time point of view, we already have too much stuff in headers. I don't think we expect to have more than three versions of snapshot reader – one with accounting and two without (normal and, in the future, reversed queries), so explicit instantiation in the source file shouldn't really be a burden.
 
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 27, 2016, 8:30:10 AM9/27/16
to Glauber Costa, scylladb-dev
On 19 September 2016 at 14:51, Glauber Costa <gla...@scylladb.com> wrote:
Looks like "no_accounter" magically appeared here while the implementation was being moved from .cc to .hh.
 
--
2.5.5

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 27, 2016, 8:38:50 AM9/27/16
to Glauber Costa, scylladb-dev
On 19 September 2016 at 14:51, Glauber Costa <gla...@scylladb.com> wrote:
By default, we don't do any accounting. By specializing this class and providing
an accounter class, we can account how much memory are we reading as we read
through the elements.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
 partition_version.hh | 42 ++++++++++++++++++++++++++++++++++++------
 1 file changed, 36 insertions(+), 6 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 6c57ab0..0bc5258 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -271,7 +271,15 @@ inline partition_version_ref& partition_snapshot::version()
     }
 }

-class partition_snapshot_reader : public streamed_mutation::impl {
+struct partition_snapshot_reader_dummy_accounter {
+   void operator()(const clustering_row& cr) {}
+   void operator()(const static_row& sr) {}
+   void operator()(const range_tombstone& rt) {}
+};
+extern partition_snapshot_reader_dummy_accounter no_accounter;

Apparently, no_accounter played tricks on you during rebase.
 
+
+template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
+class partition_snapshot_reader : public streamed_mutation::impl, public MemoryAccounter {
     struct rows_position {
         mutation_partition::rows_type::const_iterator _position;
         mutation_partition::rows_type::const_iterator _end;
@@ -308,6 +316,10 @@ class partition_snapshot_reader : public streamed_mutation::impl {
     logalloc::region& _lsa_region;
     logalloc::allocating_section& _read_section;

+    MemoryAccounter& mem_accounter() {
+        return *this;
+    }
+
     uint64_t _reclaim_counter;
     unsigned _version_count = 0;
 private:
@@ -384,11 +396,16 @@ class partition_snapshot_reader : public streamed_mutation::impl {
         return _range_tombstones.get_next();
     }

+    void emplace_mutation_fragment(mutation_fragment&& mfopt) {
+        mfopt.visit(mem_accounter());

Well, looks like we could generalise this to mutation_fragment_visitor since the reader can work with anything that visits mutation fragment even if it doesn't do any memory accounting. I am not sure, though, that there is much point in that.
 
+        _buffer.emplace_back(std::move(mfopt));

This is existing code but push_mutation_fragment() should be used here (and _buffer made private).

I just want to make sure you are aware that it may take some time until the mutation fragment queued in the buffer is actually consumed and written to the sstable. Basically, this means that the memory accounting is going to be done in a bursts of 16 mutation fragments and later when we make buffer size dynamic in a bursts of X kB. Still, I don't think it would ruin flush progress estimates.
And now it disappears!
 
 }
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 8:50:45 AM9/27/16
to Paweł Dziepak, scylladb-dev

On Sep 27, 2016 8:38 AM, "Paweł Dziepak" <pdzi...@scylladb.com> wrote:
>
>
>
> On 19 September 2016 at 14:51, Glauber Costa <gla...@scylladb.com> wrote:
>>
>> By default, we don't do any accounting. By specializing this class and providing
>> an accounter class, we can account how much memory are we reading as we read
>> through the elements.
>>
>> Signed-off-by: Glauber Costa <gla...@scylladb.com>
>> ---
>>  partition_version.hh | 42 ++++++++++++++++++++++++++++++++++++------
>>  1 file changed, 36 insertions(+), 6 deletions(-)
>>
>> diff --git a/partition_version.hh b/partition_version.hh
>> index 6c57ab0..0bc5258 100644
>> --- a/partition_version.hh
>> +++ b/partition_version.hh
>> @@ -271,7 +271,15 @@ inline partition_version_ref& partition_snapshot::version()
>>      }
>>  }
>>
>> -class partition_snapshot_reader : public streamed_mutation::impl {
>> +struct partition_snapshot_reader_dummy_accounter {
>> +   void operator()(const clustering_row& cr) {}
>> +   void operator()(const static_row& sr) {}
>> +   void operator()(const range_tombstone& rt) {}
>> +};
>> +extern partition_snapshot_reader_dummy_accounter no_accounter;
>
>
> Apparently, no_accounter played tricks on you during rebase.
>  

Thanks for spotting it.

>>
>> +
>> +template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
>> +class partition_snapshot_reader : public streamed_mutation::impl, public MemoryAccounter {
>>      struct rows_position {
>>          mutation_partition::rows_type::const_iterator _position;
>>          mutation_partition::rows_type::const_iterator _end;
>> @@ -308,6 +316,10 @@ class partition_snapshot_reader : public streamed_mutation::impl {
>>      logalloc::region& _lsa_region;
>>      logalloc::allocating_section& _read_section;
>>
>> +    MemoryAccounter& mem_accounter() {
>> +        return *this;
>> +    }
>> +
>>      uint64_t _reclaim_counter;
>>      unsigned _version_count = 0;
>>  private:
>> @@ -384,11 +396,16 @@ class partition_snapshot_reader : public streamed_mutation::impl {
>>          return _range_tombstones.get_next();
>>      }
>>
>> +    void emplace_mutation_fragment(mutation_fragment&& mfopt) {
>> +        mfopt.visit(mem_accounter());
>
>
> Well, looks like we could generalise this to mutation_fragment_visitor since the reader can work with anything that visits mutation fragment even if it doesn't do any memory accounting. I am not sure, though, that there is much point in that.
>  
>>
>> +        _buffer.emplace_back(std::move(mfopt));
>
>
> This is existing code but push_mutation_fragment() should be used here (and _buffer made private).
>
> I just want to make sure you are aware that it may take some time until the mutation fragment queued in the buffer is actually consumed and written to the sstable. Basically, this means that the memory accounting is going to be done in a bursts of 16 mutation fragments and later when we make buffer size dynamic in a bursts of X kB. Still, I don't think it would ruin flush progress estimates.
>  

I am aware of that. It doesn't matter as long as the rate in which we consume is tied to the rate in which we write the sstables.

If we were to read the whole thing beforehand it wouldn't work. As we read on a need to use basis, it works - even if there is time elapsed between accounting and usage.

>> To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev...@googlegroups.com.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 27, 2016, 8:52:21 AM9/27/16
to Avi Kivity, Glauber Costa, scylladb-dev
On 23 September 2016 at 20:29, Avi Kivity <a...@scylladb.com> wrote:

Looks good to me.  Paweł, you had some comments, were they addressed?

Looks good overall. I am not that insistent on not moving partition_snapshot_reader to header, but it seems to me that bisactibility is broken in this version ("no_accounter").

Avi Kivity

<avi@scylladb.com>
unread,
Sep 27, 2016, 9:55:45 AM9/27/16
to Glauber Costa, Paweł Dziepak, scylladb-dev
What's important is the batch size divided by the rate.  If we read and free 10 MB up-front and write at 100 MB/s, then we can induce latencies of up to 100 ms.  From the client side it will look as a bug of writes completing at very low latency, then a mini-wall waiting 100ms, then repeat.  We'll have to fine-tune it later, but the current state should already be much improved over what we have now.




Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 12:10:12 PM9/27/16
to Paweł Dziepak, Avi Kivity, scylladb-dev
On Tue, Sep 27, 2016 at 8:52 AM, Paweł Dziepak <pdzi...@scylladb.com> wrote:
>
>
> On 23 September 2016 at 20:29, Avi Kivity <a...@scylladb.com> wrote:
>>
>>
>> Looks good to me. Paweł, you had some comments, were they addressed?
>
>
> Looks good overall. I am not that insistent on not moving
> partition_snapshot_reader to header, but it seems to me that bisactibility
> is broken in this version ("no_accounter").

Thanks Pawel. I'll fix the issue with no_accounter and resend.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:19 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
Available at:

To g...@github.com:glommer/scylla.git virtual-dirty-v4
components at this point. This happened in practice with a buggy kernel that
would result in flushes taking a long time.

After that is fixed, this is just a theoretical problem and in practice it
shouldn't matter given the time we expect those operations to take.

Changes from V3:
================
- Fixed a bisectability issue pointed out by Pawel
- Incorporated a suggestion by Pawel, and now _buffer is a private (not protected)
member of the streamed_mutation class.

Changes from V2:
================
- The delay-blocked_requests_max counter is removed. It is important to have but
not crucial, and in its current form it lacks important information about peaks.
It will be done later as a quantile.
- Revert back to accounting mutation fragments.
- Fix the problem with accounting snapshots, which can lead to overaccounting. Do
this by reverting back to accounting MFs). We lose a bit of accuracy, but there
seem to be no simple way to properly handle snapshot partitions otherwise.

Changes from V1:
================
- Only used space is accounted for, and we don't account for padding, or free
space in the region. While this slow us down a bit, it is not significant and
it simplifies the code a lot - since we don't have to have extra protections
against compactions
- get rid of virtual functions
- account objects before they are transformed into a mutation fragment -
guaranteeing that they will be in the correct region.

Changes from RFC:
=================
- The accounting was moved to the reader side (mutation_reader), instead of being
done by at the SSTable side.
- Accounting of relevant internal structures (memtable_entry, rows entry) is more
precise.

Glauber Costa (10):
database: export virtual dirty bytes region group
LSA: export information about size of the throttle queue
LSA: export information about object memory footprint
sstables: use special reader for writing a memtable
memtables: split scanning reader in two
LSA: allow a group to query its own region group
move partition_snapshot_reader code to header file
add accounting of memory read to partition_snapshot_reader
streamed_mutation: make _buffer private
database: allow virtual dirty memory management

database.hh | 25 ++++++
memtable.hh | 5 +-
partition_version.hh | 204 ++++++++++++++++++++++++++++++++++++++++---
streamed_mutation.hh | 2 +-
utils/allocation_strategy.hh | 9 ++
utils/logalloc.hh | 6 ++
database.cc | 37 +++++++-
memtable.cc | 199 +++++++++++++++++++++++++++++++++++------
partition_version.cc | 178 -------------------------------------
sstables/sstables.cc | 2 +-
utils/logalloc.cc | 23 ++++-
11 files changed, 463 insertions(+), 227 deletions(-)

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:19 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
Currently, we export the region group where memtables are placed as dirty bytes.
Upcoming patches will optimistically mark some bytes in this region as free, a
scheme we know as "virtual dirty".

We are still interested in knowing the real state of the dirty region, so we
will keep track of the bytes virtually freed and split the counters in two.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
database.hh | 25 +++++++++++++++++++++++++
database.cc | 10 +++++++++-
2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/database.hh b/database.hh
index 8d9eb30..b7ed3c5 100644
--- a/database.hh
+++ b/database.hh
@@ -74,6 +74,7 @@
#include <seastar/core/rwlock.hh>
#include <seastar/core/shared_future.hh>
#include "tracing/trace_state.hh"
+#include <boost/intrusive/parent_from_member.hpp>

class frozen_mutation;
class reconcilable_result;
@@ -133,6 +134,7 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
// default values here.
size_t _concurrency;
semaphore _flush_serializer;
+ int64_t _dirty_bytes_released_pre_accounted = 0;

seastar::gate _waiting_flush_gate;
std::vector<shared_memtable> _pending_flushes;
@@ -156,6 +158,11 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
, _region_group(&parent->_region_group, *this)
, _concurrency(concurrency)
, _flush_serializer(concurrency) {}
+
+ static dirty_memory_manager& from_region_group(logalloc::region_group *rg) {
+ return *(boost::intrusive::get_parent_from_member(rg, &dirty_memory_manager::_region_group));
+ }
+
logalloc::region_group& region_group() {
return _region_group;
}
@@ -164,6 +171,24 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
return _region_group;
}

+ void revert_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(delta);
+ _dirty_bytes_released_pre_accounted -= delta;
+ }
+
+ void account_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(-delta);
+ _dirty_bytes_released_pre_accounted += delta;
+ }
+
+ size_t real_dirty_memory() const {
+ return _region_group.memory_used() + _dirty_bytes_released_pre_accounted;
+ }
+
+ size_t virtual_dirty_memory() const {
+ return _region_group.memory_used();
+ }
+
template <typename Func>
future<> serialize_flush(Func&& func) {
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
diff --git a/database.cc b/database.cc
index c9519b2..3714385 100644
--- a/database.cc
+++ b/database.cc
@@ -1601,7 +1601,15 @@ database::setup_collectd() {
, scollectd::per_cpu_plugin_instance
, "bytes", "dirty")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
- return dirty_memory_region_group().memory_used();
+ return _dirty_memory_manager.real_dirty_memory();
+ })));
+
+ _collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("memory"
+ , scollectd::per_cpu_plugin_instance
+ , "bytes", "virtual_dirty")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.virtual_dirty_memory();
})));

_collectd.push_back(
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:19 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
Also add information about for how long has the oldest been sitting in the
queue. This is part of the backpressure work to allow us to throttle incoming
requests if we won't have memory to process them. Shortages can happen in all
sorts of places, and it is useful when designing and testing the solutions to
know where they are, and how bad they are.

This counter is named for consistency after similar counters from transport/.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 4 ++++
database.cc | 9 +++++++++
2 files changed, 13 insertions(+)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index db8ab35..1b104ec 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -318,6 +318,10 @@ class region_group {
_shutdown_requested = true;
return _asynchronous_gate.close();
}
+
+ size_t blocked_requests() {
+ return _blocked_requests.size();
+ }
private:
// Make sure we get a notification and can call release_requests when one of our ancestors that
// used to block us is no longer under memory pressure.
diff --git a/database.cc b/database.cc
index 3714385..e7be0b0 100644
--- a/database.cc
+++ b/database.cc
@@ -1620,6 +1620,15 @@ database::setup_collectd() {
));

_collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("database"
+ , scollectd::per_cpu_plugin_instance
+ , "queue_length", "requests_blocked_memory")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.region_group().blocked_requests();
+ })
+ ));
+
+ _collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("memtables"
, scollectd::per_cpu_plugin_instance
, "bytes", "pending_flushes")
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:20 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
We allocate objects of a certain size, but we use a bit more memory to hold
them. To get a clerer picture about how much memory will an object cost us, we
need help from the allocator. This patch exports an interface that allow users
to query into a specific allocator to get that information.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/allocation_strategy.hh | 9 +++++++++
utils/logalloc.cc | 15 +++++++++++++--
2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/utils/allocation_strategy.hh b/utils/allocation_strategy.hh
index bc5613a..c223db1 100644
--- a/utils/allocation_strategy.hh
+++ b/utils/allocation_strategy.hh
@@ -97,6 +97,11 @@ class allocation_strategy {
// Doesn't invalidate references to objects allocated with this strategy.
virtual void free(void*) = 0;

+ // Returns the total memory size used by the allocator to host this object.
+ // This will be at least the size of the object itself, plus the overhead, if any,
+ // to represent the object.
+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept = 0;
+
// Like alloc() but also constructs the object with a migrator using
// standard move semantics. Allocates respecting object's alignment
// requirement.
@@ -138,6 +143,10 @@ class standard_allocation_strategy : public allocation_strategy {
virtual void free(void* obj) override {
::free(obj);
}
+
+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept {
+ return ::malloc_usable_size(const_cast<void *>(obj));
+ }
};

extern standard_allocation_strategy standard_allocation_strategy_instance;
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index 837c3cb..557ebc7 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
@@ -517,7 +517,7 @@ class segment_pool {
segment* new_segment(region::impl* r);
segment_descriptor& descriptor(const segment*);
// Returns segment containing given object or nullptr.
- segment* containing_segment(void* obj) const;
+ segment* containing_segment(const void* obj) const;
void free_segment(segment*) noexcept;
void free_segment(segment*, segment_descriptor&) noexcept;
size_t segments_in_use() const;
@@ -720,7 +720,7 @@ segment_pool::descriptor(const segment* seg) {
}

segment*
-segment_pool::containing_segment(void* obj) const {
+segment_pool::containing_segment(const void* obj) const {
auto addr = reinterpret_cast<uintptr_t>(obj);
auto offset = addr & (segment::size - 1);
auto index = (addr - _segments_base) >> segment::size_shift;
@@ -1380,6 +1380,17 @@ class region_impl : public allocation_strategy {
}
}

+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept override {
+ segment* seg = shard_segment_pool.containing_segment(obj);
+
+ if (!seg) {
+ return standard_allocator().object_memory_size_in_allocator(obj);
+ } else {
+ auto desc = reinterpret_cast<object_descriptor*>(reinterpret_cast<uintptr_t>(obj) - sizeof(object_descriptor));
+ return sizeof(object_descriptor) + desc->size();
+ }
+ }
+
// Merges another region into this region. The other region is made
// to refer to this region.
// Doesn't invalidate references to allocated objects.
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:20 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
Right now the special reader doesn't do much, but the idea is that we will
soon replace it will a reader that specializes in flush, and is in turn able
to provide read-side on-flush functionality like virtual dirty.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 3 +++
memtable.cc | 5 +++++
sstables/sstables.cc | 2 +-
3 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/memtable.hh b/memtable.hh
index c6f9960..98fc7eb 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -147,6 +147,9 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
const query::partition_slice& slice = query::full_slice,
const io_priority_class& pc = default_priority_class());

+
+ mutation_reader make_flush_reader(schema_ptr);
+
mutation_source as_data_source();
key_source as_key_source();

diff --git a/memtable.cc b/memtable.cc
index ac6b04a..93f3d02 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -217,6 +217,11 @@ memtable::make_reader(schema_ptr s,
}
}

+mutation_reader
+memtable::make_flush_reader(schema_ptr s) {
+ return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::no_clustering_key_filtering, default_priority_class());
+}
+
void
memtable::update(const db::replay_position& rp) {
if (_replay_position < rp) {
diff --git a/sstables/sstables.cc b/sstables/sstables.cc
index e87515a..e5a2cb3 100644
--- a/sstables/sstables.cc
+++ b/sstables/sstables.cc
@@ -1781,7 +1781,7 @@ void components_writer::consume_end_of_stream() {

future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc, bool leave_unsealed) {
_collector.set_replay_position(mt.replay_position());
- return write_components(mt.make_reader(mt.schema()),
+ return write_components(mt.make_flush_reader(mt.schema()),
mt.partition_count(), mt.schema(), std::numeric_limits<uint64_t>::max(), backup, pc, leave_unsealed);
}

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:21 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
The code that is common will live in its own reader, the iterator_reader. All
friendly private access to memtable attributes and methods happen through the
iterator reader.

After this patch, we are now left with the scanning_reader - same as always,
but now implemented on top of the iterator_reader, and a flush_reader, which
will be used by SSTable flushes only.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 2 +-
memtable.cc | 127 ++++++++++++++++++++++++++++++++++++++++++++++--------------
2 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/memtable.hh b/memtable.hh
index 98fc7eb..a0a5cbd 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -161,5 +161,5 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
return _replay_position;
}

- friend class scanning_reader;
+ friend class iterator_reader;
};
diff --git a/memtable.cc b/memtable.cc
index 93f3d02..3a1743b 100644
--- a/memtable.cc
+++ b/memtable.cc
+ }
+ return {};
+ }
+
+ mutation_reader delegate_reader(const query::partition_range& delegate,
+ {}
+
+ virtual future<streamed_mutation_opt> operator()() override {
+ logalloc::reclaim_lock _(region());
+ managed_bytes::linearization_context_guard lcg;
+ memtable_entry* e = fetch_next_entry();
+ if (!e) {
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
+ } else {
+ return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice));
}
- memtable_entry& e = *_i;
- ++_i;
- _last = e.key();
- _memtable->upgrade_entry(e);
- return make_ready_future<streamed_mutation_opt>(e.read(_memtable, _schema, _slice));
}
};

@@ -219,7 +286,7 @@ memtable::make_reader(schema_ptr s,

mutation_reader
memtable::make_flush_reader(schema_ptr s) {
- return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::no_clustering_key_filtering, default_priority_class());
+ return make_mutation_reader<flush_reader>(std::move(s), shared_from_this());
}

void
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:22 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 2 ++
utils/logalloc.cc | 8 ++++++++
2 files changed, 10 insertions(+)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index 1b104ec..a38307a 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -566,6 +566,8 @@ class region {

allocation_strategy& allocator();

+ region_group* group();
+
// Merges another region into this region. The other region is left empty.
// Doesn't invalidate references to allocated objects.
void merge(region& other);
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index 557ebc7..83c783c 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
@@ -1295,6 +1295,10 @@ class region_impl : public allocation_strategy {
return total;
}

+ region_group* group() {
+ return _group;
+ }
+
occupancy_stats compactible_occupancy() const {
return _closed_occupancy;
}
@@ -1622,6 +1626,10 @@ occupancy_stats region::occupancy() const {
return _impl->occupancy();
}

+region_group* region::group() {
+ return _impl->group();
+}
+
void region::merge(region& other) {
if (_impl != other._impl) {
_impl->merge(*other._impl);
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:23 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
This is so we can template it without worrying about declaring the
specializations in the .cc file.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 172 +++++++++++++++++++++++++++++++++++++++++++++----
partition_version.cc | 178 ---------------------------------------------------
2 files changed, 161 insertions(+), 189 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 45a3e13..2722df6 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -311,23 +311,173 @@ class partition_snapshot_reader : public streamed_mutation::impl {
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
- void refresh_iterators();
- void pop_clustering_row();
+ void refresh_iterators() {
+ _clustering_rows.clear();

- mutation_fragment_opt read_static_row();
- mutation_fragment_opt read_next();
- void do_fill_buffer();
- static tombstone tomb(partition_snapshot& snp);
+ if (!_in_ck_range && _current_ck_range == _ck_range_end) {
+ return;
+ }
+
+ for (auto&& v : _snapshot->versions()) {
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
- boost::any pointer_to_container);
- ~partition_snapshot_reader();
- virtual future<> fill_buffer() override;
+ boost::any pointer_to_container)
+ : streamed_mutation::impl(s, std::move(dk), tomb(*snp))
+ , _container_guard(std::move(pointer_to_container))
+ }
+
+ virtual future<> fill_buffer() override {
+ return _read_section(_lsa_region, [&] {
+ return with_linearized_managed_bytes([&] {
+ do_fill_buffer();
+ return make_ready_future<>();
+ });
+ });
+ }
};

-streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container);
+ logalloc::allocating_section& read_section, boost::any pointer_to_container)
+{
+ return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container));
+}
diff --git a/partition_version.cc b/partition_version.cc
index 0a92a14..e61acde 100644
--- a/partition_version.cc
+++ b/partition_version.cc
@@ -291,181 +291,3 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(schema_ptr entry_schema)
return snp;
}
}
-
-partition_snapshot_reader::partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
- lw_shared_ptr<partition_snapshot> snp,
- query::clustering_key_filter_ranges crr, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
- : streamed_mutation::impl(s, std::move(dk), tomb(*snp))
- if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
- }
- }
-
- if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
- refresh_iterators();
- _reclaim_counter = _lsa_region.reclaim_counter();
- _version_count = _snapshot->version_count();
- }
-
- while (!is_end_of_stream() && !is_buffer_full()) {
- if (_in_ck_range && _clustering_rows.empty()) {
- _in_ck_range = false;
- _current_ck_range = std::next(_current_ck_range);
- refresh_iterators();
- continue;
- }
-
- auto mfopt = read_next();
- if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
- } else {
- _end_of_stream = true;
- }
- }
-}
-
-future<> partition_snapshot_reader::fill_buffer()
-{
- return _read_section(_lsa_region, [&] {
- return with_linearized_managed_bytes([&] {
- do_fill_buffer();
- return make_ready_future<>();
- });
- });
-}
-
-streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
- query::clustering_key_filter_ranges crr,
- lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
-{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
- snp, std::move(crr), region, read_section, std::move(pointer_to_container));
-}
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:24 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
It is currently protected, but now all users go through
push_mutation_fragment(). So we can safely move its visibility to guarantee
that it stays that way.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
streamed_mutation.hh | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/streamed_mutation.hh b/streamed_mutation.hh
index adb8b17..7ee8f4c 100644
--- a/streamed_mutation.hh
+++ b/streamed_mutation.hh
@@ -399,6 +399,7 @@ class streamed_mutation {
// supposed to fill a buffer with mutation fragments until is_buffer_full()
// or end of stream is encountered.
class impl {
+ circular_buffer<mutation_fragment> _buffer;
protected:
// FIXME: use size in bytes of the mutation_fragments
static constexpr size_t buffer_size = 16;
@@ -408,7 +409,6 @@ class streamed_mutation {
tombstone _partition_tombstone;

bool _end_of_stream = false;
- circular_buffer<mutation_fragment> _buffer;

friend class streamed_mutation;
protected:
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:24 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
By default, we don't do any accounting. By specializing this class and providing
an accounter class, we can account how much memory are we reading as we read
through the elements.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 40 +++++++++++++++++++++++++++++++++++-----
1 file changed, 35 insertions(+), 5 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 2722df6..418a0e1 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -271,7 +271,15 @@ inline partition_version_ref& partition_snapshot::version()
}
}

-class partition_snapshot_reader : public streamed_mutation::impl {
+struct partition_snapshot_reader_dummy_accounter {
+ void operator()(const clustering_row& cr) {}
+ void operator()(const static_row& sr) {}
+ void operator()(const range_tombstone& rt) {}
+};
+extern partition_snapshot_reader_dummy_accounter no_accounter;
+
+template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
+class partition_snapshot_reader : public streamed_mutation::impl, public MemoryAccounter {
struct rows_position {
mutation_partition::rows_type::const_iterator _position;
mutation_partition::rows_type::const_iterator _end;
@@ -308,6 +316,10 @@ class partition_snapshot_reader : public streamed_mutation::impl {
logalloc::region& _lsa_region;
logalloc::allocating_section& _read_section;

+ MemoryAccounter& mem_accounter() {
+ return *this;
+ }
+
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
@@ -384,11 +396,16 @@ class partition_snapshot_reader : public streamed_mutation::impl {
return _range_tombstones.get_next();
}

+ void emplace_mutation_fragment(mutation_fragment&& mfopt) {
+ mfopt.visit(mem_accounter());
+ push_mutation_fragment(std::move(mfopt));
+ }
+
void do_fill_buffer() {
if (!_last_entry) {
auto mfopt = read_static_row();
if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
+ emplace_mutation_fragment(std::move(*mfopt));
}
}

@@ -408,7 +425,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {

auto mfopt = read_next();
if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
+ emplace_mutation_fragment(std::move(*mfopt));
} else {
_end_of_stream = true;
}
@@ -423,11 +440,13 @@ class partition_snapshot_reader : public streamed_mutation::impl {
return t;
}
public:
+ template <typename... Args>
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
- boost::any pointer_to_container)
+ boost::any pointer_to_container, Args&&... args)
: streamed_mutation::impl(s, std::move(dk), tomb(*snp))
+ , MemoryAccounter(std::forward<Args>(args)...)
, _container_guard(std::move(pointer_to_container))
, _ck_ranges(std::move(crr))
, _current_ck_range(_ck_ranges.begin())
@@ -472,12 +491,23 @@ class partition_snapshot_reader : public streamed_mutation::impl {
}
};

+template <typename MemoryAccounter, typename... Args>
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+ query::clustering_key_filter_ranges crr,
+ lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
+ logalloc::allocating_section& read_section, boost::any pointer_to_container, Args&&... args)
+{
+ return make_streamed_mutation<partition_snapshot_reader<MemoryAccounter>>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container), std::forward<Args>(args)...);
+}
+
inline streamed_mutation
make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
logalloc::allocating_section& read_section, boost::any pointer_to_container)
{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
+ return make_streamed_mutation<partition_snapshot_reader<>>(s, std::move(dk),
snp, std::move(crr), region, read_section, std::move(pointer_to_container));
}
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 27, 2016, 1:24:25 PM9/27/16
to scylladb-dev@googlegroups.com, Glauber Costa
Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that is that once we reach 50 % occupancy in our dirty
memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
database.cc | 18 ++++++++++++++--
memtable.cc | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 86 insertions(+), 3 deletions(-)

diff --git a/database.cc b/database.cc
index e7be0b0..344c88e 100644
--- a/database.cc
+++ b/database.cc
@@ -1583,8 +1583,22 @@ database::database(const db::config& cfg)
// in a different region group. This is because throttled requests are serviced in FIFO order,
// and we don't want system requests to be waiting for a long time behind user requests.
, _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20))
- , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space)
- , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space)
+ // The total space that can be used by memtables is _memtable_total_space, but we will only
+ // allow the region_group to grow to half of that. This is because of virtual_dirty: memtables
+ // can take a long time to flush, and if we are using the maximum amount of memory possible,
+ // then requests will block until we finish flushing at least one memtable.
+ //
+ // We can free memory until the whole memtable is flushed because we need to keep it in memory
+ // until the end, but we can fake freeing memory. When we are done with an element of the
+ // memtable, we will update the region group pretending memory just went down by that amount.
+ //
+ // Because the amount of memory that we pretend to free should be close enough to the actual
+ // memory used by the memtables, that effectively creates two sub-regions inside the dirty
+ // region group, of equal size. In the worst case, we will have _memtable_total_space dirty
+ // bytes used, and half of that already virtually freed.
+ , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space / 2)
+ // The same goes for streaming in respect to virtual dirty.
+ , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space / 2)
, _version(empty_version)
, _enable_incremental_backups(cfg.incremental_backups())
{
diff --git a/memtable.cc b/memtable.cc
index 3a1743b..33bf39d 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -20,6 +20,7 @@
*/

#include "memtable.hh"
+#include "database.hh"
#include "frozen_mutation.hh"
#include "sstable_mutation_readers.hh"

@@ -240,11 +241,75 @@ class scanning_reader final: public iterator_reader {
}
};

+class flush_memory_accounter {
+ uint64_t _bytes_read = 0;
+ logalloc::region& _region;
+
+public:
+ void update_bytes_read(uint64_t delta) {
+ _bytes_read += delta;
+ dirty_memory_manager::from_region_group(_region.group()).account_potentially_cleaned_up_memory(delta);
+ }
+
+ explicit flush_memory_accounter(logalloc::region& region)
+ : _region(region)
+ {}
+
+ ~flush_memory_accounter() {
+ assert(_bytes_read <= _region.occupancy().used_space());
+ dirty_memory_manager::from_region_group(_region.group()).revert_potentially_cleaned_up_memory(_bytes_read);
+ }
+ void account_component(memtable_entry& e) {
+ auto delta = with_allocator(_region.allocator(), [&e] () -> uint64_t {
+ return current_allocator().object_memory_size_in_allocator(&e) +
+ current_allocator().object_memory_size_in_allocator(&*(partition_snapshot(e.schema(), &(e.partition())).version())) +
+ e.key().key().memory_usage();
+ });
+ update_bytes_read(delta);
+ }
+};
+
+class partition_snapshot_accounter {
+ flush_memory_accounter& _accounter;
+public:
+ partition_snapshot_accounter(flush_memory_accounter& acct): _accounter(acct) {}
+
+ // We will be passed mutation fragments here, and they are allocated using the standard
+ // allocator. So we can't compute the size in memtable precisely. However, precise accounting is
+ // hard anyway, since we may be holding multiple snapshots of the partitions, and the
+ // partition_snapshot_reader may compose them. In doing so, we move memory to the standard
+ // allocation. As long as our size read here is lesser or equal to the size in the memtables, we
+ // are safe, and worst case we will allow a bit fewer requests in.
+ void operator()(const range_tombstone& rt) {
+ _accounter.update_bytes_read(rt.memory_usage());
+ }
+
+ void operator()(const static_row& sr) {
+ _accounter.update_bytes_read(sr.memory_usage());
+ }
+
+ void operator()(const clustering_row& cr) {
+ // Every clustering row is stored in a rows_entry object, and that has some significant
+ // overhead - so add it here. We will be a bit short on our estimate because we can't know
+ // what is the size in the allocator for this rows_entry object: we may have many snapshots,
+ // and we don't know which one(s) contributed to the generation of this mutation fragment.
+ //
+ // We will add the size of the struct here, and that should be good enough.
+ _accounter.update_bytes_read(sizeof(rows_entry) + cr.memory_usage());
+ }
+};
+
class flush_reader final : public iterator_reader {
+ flush_memory_accounter _flushed_memory;
public:
flush_reader(schema_ptr s, lw_shared_ptr<memtable> m)
: iterator_reader(std::move(s), std::move(m), query::full_partition_range)
+ , _flushed_memory(region())
{}
+ flush_reader(const flush_reader&) = delete;
+ flush_reader(flush_reader&&) = delete;
+ flush_reader& operator=(flush_reader&&) = delete;
+ flush_reader& operator=(const flush_reader&) = delete;

virtual future<streamed_mutation_opt> operator()() override {
logalloc::reclaim_lock _(region());
@@ -253,7 +318,11 @@ class flush_reader final : public iterator_reader {
if (!e) {
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
} else {
- return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice));
+ auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), query::full_slice, e->key().key());
+ auto snp = e->partition().read(schema());
+ auto mpsr = make_partition_snapshot_reader<partition_snapshot_accounter>(schema(), e->key(), std::move(cr), snp, region(), read_section(), mtbl(), _flushed_memory);
+ _flushed_memory.account_component(*e);
+ return make_ready_future<streamed_mutation_opt>(std::move(mpsr));
}
}
};
--
2.5.5

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 28, 2016, 4:37:11 AM9/28/16
to Glauber Costa, scylladb-dev
Missing include:

In file included from ./utils/managed_vector.hh:27:0,
                 from tests/managed_vector_test.cc:27:
./utils/allocation_strategy.hh: In member function ‘virtual size_t standard_allocation_strategy::object_memory_size_in_allocator(const void*) const’:
./utils/allocation_strategy.hh:148:16: error: ‘::malloc_usable_size’ has not been declared
         return ::malloc_usable_size(const_cast<void *>(obj));

 
 };
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.

To post to this group, send email to scylla...@googlegroups.com.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Sep 28, 2016, 8:54:05 AM9/28/16
to Glauber Costa, scylladb-dev
 };

 extern standard_allocation_strategy standard_allocation_strategy_instance;
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index c244324..17a81f4 100644

--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
@@ -517,7 +517,7 @@ class segment_pool {
     segment* new_segment(region::impl* r);
     segment_descriptor& descriptor(const segment*);
     // Returns segment containing given object or nullptr.
-    segment* containing_segment(void* obj) const;
+    segment* containing_segment(const void* obj) const;
     void free_segment(segment*) noexcept;
     void free_segment(segment*, segment_descriptor&) noexcept;
     size_t segments_in_use() const;
@@ -720,7 +720,7 @@ segment_pool::descriptor(const segment* seg) {
 }

 segment*
-segment_pool::containing_segment(void* obj) const {
+segment_pool::containing_segment(const void* obj) const {
     auto addr = reinterpret_cast<uintptr_t>(obj);
     auto offset = addr & (segment::size - 1);
     auto index = (addr - _segments_base) >> segment::size_shift;
@@ -1378,6 +1378,17 @@ class region_impl : public allocation_strategy {

         }
     }

+    virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept override {
+        segment* seg = shard_segment_pool.containing_segment(obj);
+
+        if (!seg) {
+            return standard_allocator().object_memory_size_in_allocator(obj);
+        } else {
+            auto desc = reinterpret_cast<object_descriptor*>(reinterpret_cast<uintptr_t>(obj) - sizeof(object_descriptor));
+            return sizeof(object_descriptor) + desc->size();

I think you could achieve better accuracy if you also add desc->padding(). This way if you sum footprint of all objects in the region you will get a number which is equal to "used" size for the whole region.
 

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 28, 2016, 9:02:24 AM9/28/16
to Tomasz Grabiec, Glauber Costa, scylladb-dev
Only if the code doesn't defer. Padding is changed by compaction and so would object_memory_size_in_allocator(). I think it is better if the value returned by that function doesn't change.
For the purposes of this series adding padding is rather inadvisable since the assumption is that estimates are allowed to be smaller but not larger than the actual value (there is an assertion check) and the code does defer. So, if padding was added here and the rest of the code was unchanged unlucky people could see abort()s.
 

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Sep 28, 2016, 9:51:25 AM9/28/16
to Paweł Dziepak, Glauber Costa, scylladb-dev
I see. 

Those expectations should be documented on allocation_strategy::object_memory_size_in_allocator().
 

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Sep 28, 2016, 10:05:41 AM9/28/16
to Glauber Costa, scylladb-dev
On Mon, Sep 19, 2016 at 3:51 PM, Glauber Costa <gla...@scylladb.com> wrote:
The code that is common will live in its own reader, the iterator_reader.  All
friendly private access to memtable attributes and methods happen through the
iterator reader.

After this patch, we are now left with the scanning_reader - same as always,
but now implemented on top of the iterator_reader, and a flush_reader, which
will be used by SSTable flushes only.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
 memtable.hh |   2 +-
 memtable.cc | 127 ++++++++++++++++++++++++++++++++++++++++++++++--------------
 2 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/memtable.hh b/memtable.hh
index 98fc7eb..a0a5cbd 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -161,5 +161,5 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
         return _replay_position;
     }

-    friend class scanning_reader;
+    friend class iterator_reader;
 };
diff --git a/memtable.cc b/memtable.cc
index 93f3d02..3a1743b 100644
--- a/memtable.cc
+++ b/memtable.cc

Those get_delegate_range() and delegate_reader() don't seem to belong to responsibilities of iterator_reader. They are only used by the scanning_reader for its purposes so perhaps should be there.
 
+
+    mutation_reader delegate_reader(const query::partition_range& delegate,

"delegate" seems like an inapproriate name for a partition range in this context. delegate is what this method creates. "pr"?
 
     virtual future<streamed_mutation_opt> operator()() override {
         if (_delegate_range) {
             return _delegate();
         }

-        // We cannot run concurrently with row_cache::update().
-        if (_memtable->is_flushed()) {
-            // FIXME: Use cache. See column_family::make_reader().
-            _delegate_range = _last ? _range.split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : _range;
-            _delegate = make_mutation_reader<sstable_range_wrapping_reader>(
-                _memtable->_sstable, _schema, *_delegate_range, _slice, _pc);
-            _memtable = {};
-            _last = {};
+        // FIXME: Use cache. See column_family::make_reader().
+        _delegate_range = get_delegate_range();
+        if (_delegate_range) {
+            _delegate = delegate_reader(*_delegate_range, _slice, _pc);

*_delegate_range lives on stack, but the reader expects that it will remain alive. We should store that range in this instance.


 
             return _delegate();
         }

-        logalloc::reclaim_lock _(*_memtable);
+        logalloc::reclaim_lock _(region());
         managed_bytes::linearization_context_guard lcg;
-        update_iterators();
-        if (_i == _end) {
+        memtable_entry* e = fetch_next_entry();
+        if (!e) {
+             return make_ready_future<streamed_mutation_opt>(stdx::nullopt);

+        } else {
+            return make_ready_future<streamed_mutation_opt>(e->read(mtbl(), schema(), _slice)); 
+        }
+    }
+};
+
+class flush_reader final : public iterator_reader {
+public:
+    flush_reader(schema_ptr s, lw_shared_ptr<memtable> m)
+        : iterator_reader(std::move(s), std::move(m), query::full_partition_range)
+    {}
+
+    virtual future<streamed_mutation_opt> operator()() override {

+        logalloc::reclaim_lock _(region());
+        managed_bytes::linearization_context_guard lcg;
+        memtable_entry* e = fetch_next_entry();
+        if (!e) {

             return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
+        } else {
+            return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice));

         }
-        memtable_entry& e = *_i;
-        ++_i;
-        _last = e.key();
-        _memtable->upgrade_entry(e);
-        return make_ready_future<streamed_mutation_opt>(e.read(_memtable, _schema, _slice));
     }
 };

@@ -219,7 +286,7 @@ memtable::make_reader(schema_ptr s,

 mutation_reader
 memtable::make_flush_reader(schema_ptr s) {
-    return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::no_clustering_key_filtering, default_priority_class());
+    return make_mutation_reader<flush_reader>(std::move(s), shared_from_this());
 }

 void
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 28, 2016, 10:08:36 AM9/28/16
to Tomasz Grabiec, Glauber Costa, scylladb-dev
_delegate_range is a member of scanning_reader. I don't see any problem here.
 

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 10:57:37 AM9/28/16
to Paweł Dziepak, Tomasz Grabiec, scylladb-dev
Exactly as Pavel said.
padding can be changed by compaction, so this should reflect only the
immutable size.

I'll add a comment.

>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "ScyllaDB development" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to scylladb-dev...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 10:58:38 AM9/28/16
to Paweł Dziepak, Tomasz Grabiec, scylladb-dev
I am merely moving code around here. If this gets changed I'll be
happy to rebase.
>>> email to scylladb-dev...@googlegroups.com.
>>> To post to this group, send email to scylla...@googlegroups.com.
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/scylladb-dev/d7e555e83193af621d8c08da8e27b17d6c8ae624.1474292152.git.glauber%40scylladb.com.
>>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "ScyllaDB development" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to scylladb-dev...@googlegroups.com.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Sep 28, 2016, 11:02:30 AM9/28/16
to Paweł Dziepak, Glauber Costa, scylladb-dev
Right, there's no problem here.
 

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Sep 28, 2016, 11:22:13 AM9/28/16
to Glauber Costa, scylladb-dev
On Mon, Sep 19, 2016 at 3:51 PM, Glauber Costa <gla...@scylladb.com> wrote:
Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that is that once we reach 50 % occupancy in our dirty
memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
 database.cc | 18 ++++++++++++++--
 memtable.cc | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 86 insertions(+), 3 deletions(-)

diff --git a/database.cc b/database.cc
index 012eaf2..07224d3 100644
--- a/database.cc
+++ b/database.cc
@@ -1548,8 +1548,22 @@ database::database(const db::config& cfg)

     // in a different region group. This is because throttled requests are serviced in FIFO order,
     // and we don't want system requests to be waiting for a long time behind user requests.
     , _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20))
-    , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space)
-    , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space)
+    // The total space that can be used by memtables is _memtable_total_space, but we will only
+    // allow the region_group to grow to half of that. This is because of virtual_dirty: memtables
+    // can take a long time to flush, and if we are using the maximum amount of memory possible,
+    // then requests will block until we finish flushing at least one memtable.
+    //
+    // We can free memory until the whole memtable is flushed because we need to keep it in memory
+    // until the end, but we can fake freeing memory. When we are done with an element of the
+    // memtable, we will update the region group pretending memory just went down by that amount.
+    //
+    // Because the amount of memory that we pretend to free should be close enough to the actual
+    // memory used by the memtables, that effectively creates two sub-regions inside the dirty
+    // region group, of equal size. In the worst case, we will have _memtable_total_space dirty
+    // bytes used, and half of that already virtually freed.
+    , _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space / 2)
+    // The same goes for streaming in respect to virtual dirty.
+    , _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space / 2)
     , _version(empty_version)
     , _enable_incremental_backups(cfg.incremental_backups())
 {
diff --git a/memtable.cc b/memtable.cc
index 3a1743b..33bf39d 100644
--- a/memtable.cc
+++ b/memtable.cc

Why do we need to run with_allocator() here?
 
+            return current_allocator().object_memory_size_in_allocator(&e) +

Since we can't have perfect estimates anyway, perhaps we could save on the virtual calls to the allocator and just use sizeof().
 
+                   current_allocator().object_memory_size_in_allocator(&*(partition_snapshot(e.schema(), &(e.partition())).version())) +
+                   e.key().key().memory_usage(); 

This could be encapsulated in memtable_entry::memory_usage_without_rows()
 
 
+        });
+        update_bytes_read(delta);
+    }
+};
+
+class partition_snapshot_accounter {
+    flush_memory_accounter& _accounter;
+public:
+    partition_snapshot_accounter(flush_memory_accounter& acct): _accounter(acct) {}
+
+    // We will be passed mutation fragments here, and they are allocated using the standard
+    // allocator. So we can't compute the size in memtable precisely. However, precise accounting is
+    // hard anyway, since we may be holding multiple snapshots of the partitions, and the
+    // partition_snapshot_reader may compose them. In doing so, we move memory to the standard
+    // allocation. As long as our size read here is lesser or equal to the size in the memtables, we
+    // are safe, and worst case we will allow a bit fewer requests in.
+    void operator()(const range_tombstone& rt) {
+        _accounter.update_bytes_read(rt.memory_usage())

If clustering keys are small enough, memory_usage() will return 0 since bytes will not use external storage. Do we account for sizeof(range_tombstone_entry) anywhere?
 
;

+    }
+
+    void operator()(const static_row& sr) {
+        _accounter.update_bytes_read(sr.memory_usage());
+    }
+
+    void operator()(const clustering_row& cr) {
+        // Every clustering row is stored in a rows_entry object, and that has some significant
+        // overhead - so add it here. We will be a bit short on our estimate because we can't know
+        // what is the size in the allocator for this rows_entry object: we may have many snapshots,
+        // and we don't know which one(s) contributed to the generation of this mutation fragment.
+        //
+        // We will add the size of the struct here, and that should be good enough.
+        _accounter.update_bytes_read(sizeof(rows_entry) + cr.memory_usage());
+    }
+};
+
 class flush_reader final : public iterator_reader {
+    flush_memory_accounter _flushed_memory;
 public:
     flush_reader(schema_ptr s, lw_shared_ptr<memtable> m)
         : iterator_reader(std::move(s), std::move(m), query::full_partition_range)
+        , _flushed_memory(region())

     {}
+    flush_reader(const flush_reader&) = delete;
+    flush_reader(flush_reader&&) = delete;
+    flush_reader& operator=(flush_reader&&) = delete;
+    flush_reader& operator=(const flush_reader&) = delete;

     virtual future<streamed_mutation_opt> operator()() override {
         logalloc::reclaim_lock _(region());
@@ -253,7 +318,11 @@ class flush_reader final : public iterator_reader {
         if (!e) {
             return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
         } else {
-            return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice)); 
+            auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), query::full_slice, e->key().key());
+            auto snp = e->partition().read(schema());
+            auto mpsr = make_partition_snapshot_reader<partition_snapshot_accounter>(schema(), e->key(), std::move(cr), snp, region(), read_section(), mtbl(), _flushed_memory);
+            _flushed_memory.account_component(*e);
+            return make_ready_future<streamed_mutation_opt>(std::move(mpsr));
         }
     }
 };
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.

To post to this group, send email to scylla...@googlegroups.com.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Sep 28, 2016, 11:28:36 AM9/28/16
to Glauber Costa, scylladb-dev
Looks fine overall.

On Mon, Sep 19, 2016 at 3:51 PM, Glauber Costa <gla...@scylladb.com> wrote:
Available at:

        To g...@github.com:glommer/scylla.git    virtual-dirty-v3

Description:
============


Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that, is that once we reach 50 % occupancy in our dirty

memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

components at this point. It is still unclear why, but it seems to stem from
the same issue that is plaguing the commitlog.

In any case, while we could mitigate this issue, we can't really fix it. Even
if we decide to optimistically add the free_space() remaining to the virtual
dirty area - allowing more requests to come in, a new flush cannot be initiated
until this one finishes (assuming max concurrency already used). This means
requests will be blocked for this period.


Changes from V2:
================
- The delay-blocked_requests_max counter is removed. It is important to have but
  not crucial, and in its current form it lacks important information about peaks.
  It will be done later as a quantile.
- Revert back to accounting mutation fragments.
- Fix the problem with accounting snapshots, which can lead to overaccounting. Do
  this by reverting back to accounting MFs). We lose a bit of accuracy, but there
  seem to be no simple way to properly handle snapshot partitions otherwise.

Changes from V1:
================
- Only used space is accounted for, and we don't account for padding, or free
  space in the region. While this slow us down a bit, it is not significant and
  it simplifies the code a lot - since we don't have to have extra protections
  against compactions
- get rid of virtual functions
- account objects before they are transformed into a mutation fragment -
  guaranteeing that they will be in the correct region.

Changes from RFC:
=================
- The accounting was moved to the reader side (mutation_reader), instead of being
  done by at the SSTable side.
- Accounting of relevant internal structures (memtable_entry, rows entry) is more
  precise.

Glauber Costa (9):

  database: export virtual dirty bytes region group
  LSA: export information about size of the throttle queue
  LSA: export information about object memory footprint
  sstables: use special reader for writing a memtable
  memtables: split scanning reader in two
  LSA: allow a group to query its own region group
  move partition_snapshot_reader code to header file
  add accounting of memory read to partition_snapshot_reader
  database: allow virtual dirty memory management

 database.hh                  |  25 ++++++
 memtable.hh                  |   5 +-
 partition_version.hh         | 204 ++++++++++++++++++++++++++++++++++++++++---
 utils/allocation_strategy.hh |   9 ++
 utils/logalloc.hh            |   6 ++
 database.cc                  |  37 +++++++-
 memtable.cc                  | 199 +++++++++++++++++++++++++++++++++++------
 partition_version.cc         | 178 -------------------------------------
 sstables/sstables.cc         |   2 +-
 utils/logalloc.cc            |  23 ++++-
 10 files changed, 462 insertions(+), 226 deletions(-)
--
2.5.5

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.
To post to this group, send email to scylla...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 11:45:12 AM9/28/16
to Tomasz Grabiec, scylladb-dev
Because for the memtable entry we'll be calling into the allocator's
size in allocator function. That needs to be done with the right
allocator context.

>
>>
>> + return
>> current_allocator().object_memory_size_in_allocator(&e) +
>
>
> Since we can't have perfect estimates anyway, perhaps we could save on the
> virtual calls to the allocator and just use sizeof().

As I have described in my opening letter, I am achieving > 99 %
accuracy with this method. We don't need to be 100 % accurate, but the
more we virtual free, the more requests we can allow.

The allocator overhead accounts for 5 - 10 % of the size of the
structures for each I am calling into the allocator, and they happen
at every partition. If we have a lot of small partitions, the
difference will be significant.

It's not the end of the world, and we could ultimately do it. I just
think that the overhead here is justified given what I have outlined
above.

>
>>
>> +
>> current_allocator().object_memory_size_in_allocator(&*(partition_snapshot(e.schema(),
>> &(e.partition())).version())) +
>> + e.key().key().memory_usage();
>
>
> This could be encapsulated in memtable_entry::memory_usage_without_rows()

Ok, will look into it.
Is range_tombstone_entry allocated from the LSA? I was under the
impression that it wasn't.
Can you point me to where this allocation happen?

If this is not LSA allocated, we don't care. Since we're only matching
LSA allocations here.

>
>>
>> ;
>> email to scylladb-dev...@googlegroups.com.

Paweł Dziepak

<pdziepak@scylladb.com>
unread,
Sep 28, 2016, 11:58:50 AM9/28/16
to Glauber Costa, Tomasz Grabiec, scylladb-dev
Well, we don't have range_tombstone_entry any more, it is now "range_tombstone". Basically, it is analogous to clustering rows – in mutation_partition there is range_tombstone_list _row_tombstones while range_tombstone_list is a bi::set of range_tombstones. All parts of the mutation_partition are, when in memtable, allocated using LSA. So, sizeof(range_tombstone) should be added the same way you add sizeof(rows_entry) in the clustering_row overload. If you want to see the exact place where range_tombstones are allocated see range_tombstone_list::apply_reversibly() and range_tombstone_list::insert_from() in range_tombstone_list.cc.
 
>
>>
>> ;

>> To post to this group, send email to scylla...@googlegroups.com.
>> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/scylladb-dev/7f0ab28a6a475a1bf249e781d8c873eb405a0c5c.1474292152.git.glauber%40scylladb.com.
>> For more options, visit https://groups.google.com/d/optout.
>
>

--
You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev+unsubscribe@googlegroups.com.

To post to this group, send email to scylla...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 12:02:11 PM9/28/16
to Paweł Dziepak, Tomasz Grabiec, scylladb-dev
Ok. I see things like this:

auto rt =
current_allocator().construct<range_tombstone>(it->start_bound(),
new_end, it->tomb);

Which indicate it is LSA-allocated. You are therefore right.
I'll add accounting for the range tombstone structure as well.

>>
>> >
>> >>
>> >> ;
>> >> email to scylladb-dev...@googlegroups.com.
>> >> To post to this group, send email to scylla...@googlegroups.com.
>> >> To view this discussion on the web visit
>> >>
>> >> https://groups.google.com/d/msgid/scylladb-dev/7f0ab28a6a475a1bf249e781d8c873eb405a0c5c.1474292152.git.glauber%40scylladb.com.
>> >> For more options, visit https://groups.google.com/d/optout.
>> >
>> >
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "ScyllaDB development" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to scylladb-dev...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 12:37:27 PM9/28/16
to Paweł Dziepak, scylladb-dev
It compiles just fine for me.
But I will add the malloc header just in case. We're probably using
different compiler versions, maybe it is added implicitly for me?

>
>
>>
>> };
>> email to scylladb-dev...@googlegroups.com.

Avi Kivity

<avi@scylladb.com>
unread,
Sep 28, 2016, 12:39:38 PM9/28/16
to Glauber Costa, Paweł Dziepak, scylladb-dev
Or maybe Pawel was building debug mode. In any case the rule is: use a
symbol in file X, include the header that defines it in file X.

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Sep 28, 2016, 12:50:00 PM9/28/16
to Glauber Costa, scylladb-dev
Which function is that? In case of object_memory_size_in_allocator(), we're explicitly choosing which allocator to call, so we could just call _region.allocator() directly. Or did you mean memory_usage()? That one is not documented, so I am not sure. Looking at its implementation, it doesn't call into allocator though.
 

>
>>
>> +            return
>> current_allocator().object_memory_size_in_allocator(&e) +
>
>
> Since we can't have perfect estimates anyway, perhaps we could save on the
> virtual calls to the allocator and just use sizeof().

As I have described in my opening letter, I am achieving > 99 %
accuracy with this method. We don't need to be 100 % accurate, but the
more we virtual free, the more requests we can allow.

The allocator overhead accounts for 5 - 10 % of the size of the
structures for each I am calling into the allocator, and they happen
at every partition. If we have a lot of small partitions, the
difference will be significant.

It's not the end of the world, and we could ultimately do it. I just
think that the overhead here is justified given what I have outlined
above.

Fair enough. 

Btw, did you asses impact on CPU usage of this series?
Every part of mutation_partition is allocated with LSA. mutation_partition::_row_tombstones is an intrusive map which holds range_tombstone objects. Allocated for example in range_tombstone_list::apply_reversibly():

    auto rt = current_allocator().construct<range_tombstone>(
            std::move(start), start_kind, std::move(end), end_kind, std::move(tomb));

 

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 1:07:13 PM9/28/16
to Avi Kivity, scylladb-dev, Paweł Dziepak

I agree. I will include the header in next version

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 6:34:13 PM9/28/16
to Tomasz Grabiec, scylladb-dev
I meant the former. As you have noticed yourself, this is more of a
style thing since the end result is the same.
The reason I prefer this, though, is that if we ever convert
memory_usage() to need to be allocator-dependant, this code will be
ready to go.

It's not a must, though.

>>
>>
>> >
>> >>
>> >> + return
>> >> current_allocator().object_memory_size_in_allocator(&e) +
>> >
>> >
>> > Since we can't have perfect estimates anyway, perhaps we could save on
>> > the
>> > virtual calls to the allocator and just use sizeof().
>>
>> As I have described in my opening letter, I am achieving > 99 %
>> accuracy with this method. We don't need to be 100 % accurate, but the
>> more we virtual free, the more requests we can allow.
>>
>> The allocator overhead accounts for 5 - 10 % of the size of the
>> structures for each I am calling into the allocator, and they happen
>> at every partition. If we have a lot of small partitions, the
>> difference will be significant.
>>
>> It's not the end of the world, and we could ultimately do it. I just
>> think that the overhead here is justified given what I have outlined
>> above.
>
>
> Fair enough.
>
> Btw, did you asses impact on CPU usage of this series?

Just did.
I have run a workload that gets to 100 % CPU utilization and I see no
conclusive penalty. I do see a 1 % difference in final throughput, but
I see that between runs without this patch(set) applied as well.
>> >> email to scylladb-dev...@googlegroups.com.

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:32 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
Available at:

To g...@github.com:glommer/scylla.git virtual-dirty-v5

Description:
============

Scylla currently suffers from a brick wall behavior of the request throttler.
Requests pile up until we reach the dirty memory limit, at which point we stop
serving them until we have freed enough memory to allow for more requests.

The problem is that freeing dirty memory means writing an SSTable to completion.
That can take a long time, even if we are blessed with great disks. Those long
waiting times can and will translate into timeouts. That is bad behavior.

What this patch does is introduce one form of virtual dirty memory accounting.
Instead of allowing 100 % of the dirty memory to be filled up until we stop
accepting requests, we will do that when we reach 50 % of memory. However,
instead of releasing requests only when an SSTable is fully written, we start
releasing them when some memory was written.

The practical effect of that, is that once we reach 50 % occupancy in our dirty
memory region, we will bring the system from CPU speed to disk speed, and will
start accepting requests only at the rate we are able to write memory back.

components at this point. This happened in practice with a buggy kernel that
would result in flushes taking a long time.

After that is fixed, this is just a theoretical problem and in practice it
shouldn't matter given the time we expect those operations to take.

Changes from V4:
================
- Fix build due to lack of malloc header
- Account for range_tombstone structure
- Style changes as suggested by Tomek
- Added comments on the LSA memory footprint patch

Changes from V3:
================
- Fixed a bisectability issue pointed out by Pawel
- Incorporated a suggestion by Pawel, and now _buffer is a private (not protected)
member of the streamed_mutation class.

Changes from V2:
================
- The delay-blocked_requests_max counter is removed. It is important to have but
not crucial, and in its current form it lacks important information about peaks.
It will be done later as a quantile.
- Revert back to accounting mutation fragments.
- Fix the problem with accounting snapshots, which can lead to overaccounting. Do
this by reverting back to accounting MFs). We lose a bit of accuracy, but there
seem to be no simple way to properly handle snapshot partitions otherwise.

Changes from V1:
================
- Only used space is accounted for, and we don't account for padding, or free
space in the region. While this slow us down a bit, it is not significant and
it simplifies the code a lot - since we don't have to have extra protections
against compactions
- get rid of virtual functions
- account objects before they are transformed into a mutation fragment -
guaranteeing that they will be in the correct region.

Changes from RFC:
=================
- The accounting was moved to the reader side (mutation_reader), instead of being
done by at the SSTable side.
- Accounting of relevant internal structures (memtable_entry, rows entry) is more
precise.


Glauber Costa (10):
database: export virtual dirty bytes region group
LSA: export information about size of the throttle queue
LSA: export information about object memory footprint
sstables: use special reader for writing a memtable
memtables: split scanning reader in two
LSA: allow a group to query its own region group
move partition_snapshot_reader code to header file
add accounting of memory read to partition_snapshot_reader
streamed_mutation: make _buffer private
database: allow virtual dirty memory management

database.hh | 25 ++++++
memtable.hh | 9 +-
partition_version.hh | 204 ++++++++++++++++++++++++++++++++++++++++---
streamed_mutation.hh | 2 +-
utils/allocation_strategy.hh | 13 +++
utils/logalloc.hh | 6 ++
database.cc | 37 +++++++-
memtable.cc | 197 +++++++++++++++++++++++++++++++++++------
partition_version.cc | 178 -------------------------------------
sstables/sstables.cc | 2 +-
utils/logalloc.cc | 25 +++++-
11 files changed, 470 insertions(+), 228 deletions(-)

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:33 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
Currently, we export the region group where memtables are placed as dirty bytes.
Upcoming patches will optimistically mark some bytes in this region as free, a
scheme we know as "virtual dirty".

We are still interested in knowing the real state of the dirty region, so we
will keep track of the bytes virtually freed and split the counters in two.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
database.hh | 25 +++++++++++++++++++++++++
database.cc | 10 +++++++++-
2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/database.hh b/database.hh
index 8d9eb30..b7ed3c5 100644
--- a/database.hh
+++ b/database.hh
@@ -74,6 +74,7 @@
#include <seastar/core/rwlock.hh>
#include <seastar/core/shared_future.hh>
#include "tracing/trace_state.hh"
+#include <boost/intrusive/parent_from_member.hpp>

class frozen_mutation;
class reconcilable_result;
@@ -133,6 +134,7 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
// default values here.
size_t _concurrency;
semaphore _flush_serializer;
+ int64_t _dirty_bytes_released_pre_accounted = 0;

seastar::gate _waiting_flush_gate;
std::vector<shared_memtable> _pending_flushes;
@@ -156,6 +158,11 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
, _region_group(&parent->_region_group, *this)
, _concurrency(concurrency)
, _flush_serializer(concurrency) {}
+
+ static dirty_memory_manager& from_region_group(logalloc::region_group *rg) {
+ return *(boost::intrusive::get_parent_from_member(rg, &dirty_memory_manager::_region_group));
+ }
+
logalloc::region_group& region_group() {
return _region_group;
}
@@ -164,6 +171,24 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
return _region_group;
}

+ void revert_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(delta);
+ _dirty_bytes_released_pre_accounted -= delta;
+ }
+
+ void account_potentially_cleaned_up_memory(int64_t delta) {
+ _region_group.update(-delta);
+ _dirty_bytes_released_pre_accounted += delta;
+ }
+
+ size_t real_dirty_memory() const {
+ return _region_group.memory_used() + _dirty_bytes_released_pre_accounted;
+ }
+
+ size_t virtual_dirty_memory() const {
+ return _region_group.memory_used();
+ }
+
template <typename Func>
future<> serialize_flush(Func&& func) {
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
diff --git a/database.cc b/database.cc
index c9519b2..3714385 100644
--- a/database.cc
+++ b/database.cc
@@ -1601,7 +1601,15 @@ database::setup_collectd() {
, scollectd::per_cpu_plugin_instance
, "bytes", "dirty")
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
- return dirty_memory_region_group().memory_used();
+ return _dirty_memory_manager.real_dirty_memory();
+ })));
+
+ _collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("memory"
+ , scollectd::per_cpu_plugin_instance
+ , "bytes", "virtual_dirty")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.virtual_dirty_memory();
})));

_collectd.push_back(
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:33 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
Also add information about for how long has the oldest been sitting in the
queue. This is part of the backpressure work to allow us to throttle incoming
requests if we won't have memory to process them. Shortages can happen in all
sorts of places, and it is useful when designing and testing the solutions to
know where they are, and how bad they are.

This counter is named for consistency after similar counters from transport/.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 4 ++++
database.cc | 9 +++++++++
2 files changed, 13 insertions(+)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index db8ab35..1b104ec 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -318,6 +318,10 @@ class region_group {
_shutdown_requested = true;
return _asynchronous_gate.close();
}
+
+ size_t blocked_requests() {
+ return _blocked_requests.size();
+ }
private:
// Make sure we get a notification and can call release_requests when one of our ancestors that
// used to block us is no longer under memory pressure.
diff --git a/database.cc b/database.cc
index 3714385..e7be0b0 100644
--- a/database.cc
+++ b/database.cc
@@ -1620,6 +1620,15 @@ database::setup_collectd() {
));

_collectd.push_back(
+ scollectd::add_polled_metric(scollectd::type_instance_id("database"
+ , scollectd::per_cpu_plugin_instance
+ , "queue_length", "requests_blocked_memory")
+ , scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
+ return _dirty_memory_manager.region_group().blocked_requests();
+ })
+ ));
+
+ _collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("memtables"
, scollectd::per_cpu_plugin_instance
, "bytes", "pending_flushes")
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:34 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
We allocate objects of a certain size, but we use a bit more memory to hold
them. To get a clerer picture about how much memory will an object cost us, we
need help from the allocator. This patch exports an interface that allow users
to query into a specific allocator to get that information.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/allocation_strategy.hh | 13 +++++++++++++
utils/logalloc.cc | 17 ++++++++++++++---
2 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/utils/allocation_strategy.hh b/utils/allocation_strategy.hh
index bc5613a..8792833 100644
--- a/utils/allocation_strategy.hh
+++ b/utils/allocation_strategy.hh
@@ -23,6 +23,7 @@

#include <cstdlib>
#include <seastar/core/memory.hh>
+#include <malloc.h>

// A function used by compacting collectors to migrate objects during
// compaction. The function should reconstruct the object located at src
@@ -97,6 +98,14 @@ class allocation_strategy {
// Doesn't invalidate references to objects allocated with this strategy.
virtual void free(void*) = 0;

+ // Returns the total immutable memory size used by the allocator to host
+ // this object. This will be at least the size of the object itself, plus
+ // any immutable overhead needed to represent the object (if any).
+ //
+ // The immutable overhead is the overhead that cannot change over the
+ // lifetime of the object (such as padding, etc).
+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept = 0;
+
// Like alloc() but also constructs the object with a migrator using
// standard move semantics. Allocates respecting object's alignment
// requirement.
@@ -138,6 +147,10 @@ class standard_allocation_strategy : public allocation_strategy {
virtual void free(void* obj) override {
::free(obj);
}
+
+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept {
+ return ::malloc_usable_size(const_cast<void *>(obj));
+ }
};

extern standard_allocation_strategy standard_allocation_strategy_instance;
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index 837c3cb..cef52c9 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
@@ -517,7 +517,7 @@ class segment_pool {
segment* new_segment(region::impl* r);
segment_descriptor& descriptor(const segment*);
// Returns segment containing given object or nullptr.
- segment* containing_segment(void* obj) const;
+ segment* containing_segment(const void* obj) const;
void free_segment(segment*) noexcept;
void free_segment(segment*, segment_descriptor&) noexcept;
size_t segments_in_use() const;
@@ -720,7 +720,7 @@ segment_pool::descriptor(const segment* seg) {
}

segment*
-segment_pool::containing_segment(void* obj) const {
+segment_pool::containing_segment(const void* obj) const {
auto addr = reinterpret_cast<uintptr_t>(obj);
auto offset = addr & (segment::size - 1);
auto index = (addr - _segments_base) >> segment::size_shift;
@@ -830,7 +830,7 @@ class segment_pool {
_segments.erase(i);
::free(seg);
}
- segment* containing_segment(void* obj) const {
+ segment* containing_segment(const void* obj) const {
uintptr_t addr = reinterpret_cast<uintptr_t>(obj);
auto seg = reinterpret_cast<segment*>(align_down(addr, static_cast<uintptr_t>(segment::size)));
auto i = _segments.find(seg);
@@ -1380,6 +1380,17 @@ class region_impl : public allocation_strategy {
}
}

+ virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept override {
+ segment* seg = shard_segment_pool.containing_segment(obj);
+
+ if (!seg) {
+ return standard_allocator().object_memory_size_in_allocator(obj);
+ } else {
+ auto desc = reinterpret_cast<object_descriptor*>(reinterpret_cast<uintptr_t>(obj) - sizeof(object_descriptor));
+ return sizeof(object_descriptor) + desc->size();
+ }
+ }
+
// Merges another region into this region. The other region is made
// to refer to this region.
// Doesn't invalidate references to allocated objects.
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:35 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
Right now the special reader doesn't do much, but the idea is that we will
soon replace it will a reader that specializes in flush, and is in turn able
to provide read-side on-flush functionality like virtual dirty.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 3 +++
memtable.cc | 5 +++++
sstables/sstables.cc | 2 +-
3 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/memtable.hh b/memtable.hh
index c6f9960..98fc7eb 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -147,6 +147,9 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
const query::partition_slice& slice = query::full_slice,
const io_priority_class& pc = default_priority_class());

+
+ mutation_reader make_flush_reader(schema_ptr);
+
mutation_source as_data_source();
key_source as_key_source();

diff --git a/memtable.cc b/memtable.cc
index ac6b04a..19fc200 100644
--- a/memtable.cc
+++ b/memtable.cc
@@ -217,6 +217,11 @@ memtable::make_reader(schema_ptr s,
}
}

+mutation_reader
+memtable::make_flush_reader(schema_ptr s) {
+ return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::full_slice, default_priority_class());
+}
+
void
memtable::update(const db::replay_position& rp) {
if (_replay_position < rp) {
diff --git a/sstables/sstables.cc b/sstables/sstables.cc
index e87515a..e5a2cb3 100644
--- a/sstables/sstables.cc
+++ b/sstables/sstables.cc
@@ -1781,7 +1781,7 @@ void components_writer::consume_end_of_stream() {

future<> sstable::write_components(memtable& mt, bool backup, const io_priority_class& pc, bool leave_unsealed) {
_collector.set_replay_position(mt.replay_position());
- return write_components(mt.make_reader(mt.schema()),
+ return write_components(mt.make_flush_reader(mt.schema()),
mt.partition_count(), mt.schema(), std::numeric_limits<uint64_t>::max(), backup, pc, leave_unsealed);
}

--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:36 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
The code that is common will live in its own reader, the iterator_reader. All
friendly private access to memtable attributes and methods happen through the
iterator reader.

After this patch, we are now left with the scanning_reader - same as always,
but now implemented on top of the iterator_reader, and a flush_reader, which
will be used by SSTable flushes only.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
memtable.hh | 2 +-
memtable.cc | 127 ++++++++++++++++++++++++++++++++++++++++++++++--------------
2 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/memtable.hh b/memtable.hh
index 98fc7eb..a0a5cbd 100644
--- a/memtable.hh
+++ b/memtable.hh
@@ -161,5 +161,5 @@ class memtable final : public enable_lw_shared_from_this<memtable>, private loga
return _replay_position;
}

- friend class scanning_reader;
+ friend class iterator_reader;
};
diff --git a/memtable.cc b/memtable.cc
index 19fc200..3a1743b 100644
--- a/memtable.cc
+++ b/memtable.cc
+
+ mutation_reader delegate_reader(const query::partition_range& delegate,
virtual future<streamed_mutation_opt> operator()() override {
if (_delegate_range) {
return _delegate();
}

- // We cannot run concurrently with row_cache::update().
- if (_memtable->is_flushed()) {
- // FIXME: Use cache. See column_family::make_reader().
- _delegate_range = _last ? _range.split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : _range;
- _delegate = make_mutation_reader<sstable_range_wrapping_reader>(
- _memtable->_sstable, _schema, *_delegate_range, _slice, _pc);
- _memtable = {};
- _last = {};
+ // FIXME: Use cache. See column_family::make_reader().
+ _delegate_range = get_delegate_range();
+ if (_delegate_range) {
+ _delegate = delegate_reader(*_delegate_range, _slice, _pc);
+ if (!e) {
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
+ } else {
+ return make_ready_future<streamed_mutation_opt>((*e).read(mtbl(), schema(), query::full_slice));
}
- memtable_entry& e = *_i;
- ++_i;
- _last = e.key();
- _memtable->upgrade_entry(e);
- return make_ready_future<streamed_mutation_opt>(e.read(_memtable, _schema, _slice));
}
};

@@ -219,7 +286,7 @@ memtable::make_reader(schema_ptr s,

mutation_reader
memtable::make_flush_reader(schema_ptr s) {
- return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), query::full_partition_range, query::full_slice, default_priority_class());

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:36 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
utils/logalloc.hh | 2 ++
utils/logalloc.cc | 8 ++++++++
2 files changed, 10 insertions(+)

diff --git a/utils/logalloc.hh b/utils/logalloc.hh
index 1b104ec..a38307a 100644
--- a/utils/logalloc.hh
+++ b/utils/logalloc.hh
@@ -566,6 +566,8 @@ class region {

allocation_strategy& allocator();

+ region_group* group();
+
// Merges another region into this region. The other region is left empty.
// Doesn't invalidate references to allocated objects.
void merge(region& other);
diff --git a/utils/logalloc.cc b/utils/logalloc.cc
index cef52c9..0c12b28 100644
--- a/utils/logalloc.cc
+++ b/utils/logalloc.cc
@@ -1295,6 +1295,10 @@ class region_impl : public allocation_strategy {
return total;
}

+ region_group* group() {
+ return _group;
+ }
+
occupancy_stats compactible_occupancy() const {
return _closed_occupancy;
}
@@ -1622,6 +1626,10 @@ occupancy_stats region::occupancy() const {
return _impl->occupancy();
}

+region_group* region::group() {
+ return _impl->group();
+}
+
void region::merge(region& other) {
if (_impl != other._impl) {
_impl->merge(*other._impl);
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:37 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
This is so we can template it without worrying about declaring the
specializations in the .cc file.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 172 +++++++++++++++++++++++++++++++++++++++++++++----
partition_version.cc | 178 ---------------------------------------------------
2 files changed, 161 insertions(+), 189 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 45a3e13..2722df6 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -311,23 +311,173 @@ class partition_snapshot_reader : public streamed_mutation::impl {
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
- void refresh_iterators();
- void pop_clustering_row();
+ void refresh_iterators() {
+ _clustering_rows.clear();

- mutation_fragment_opt read_static_row();
- mutation_fragment_opt read_next();
- void do_fill_buffer();
- static tombstone tomb(partition_snapshot& snp);
+ if (!_in_ck_range && _current_ck_range == _ck_range_end) {
+ return;
+ }
+
+ for (auto&& v : _snapshot->versions()) {
+ auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range);
+ auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
+ if (_in_ck_range) {
+ return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp);
+ } else {
+ return v.partition().lower_bound(*_schema, *_current_ck_range);
+ }
+ }();
+
+ if (cr != cr_end) {
+ _clustering_rows.emplace_back(rows_position { cr, cr_end });
+ }
+ }
+
+ _in_ck_range = true;
+ boost::range::make_heap(_clustering_rows, heap_compare(_cmp));
+ }
+
+ void pop_clustering_row() {
+ auto& current = _clustering_rows.back();
+ current._position = std::next(current._position);
+ if (current._position == current._end) {
+ _clustering_rows.pop_back();
+ } else {
+ boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
+ }
+ }
+
+ mutation_fragment_opt read_static_row() {
+ _last_entry = position_in_partition(position_in_partition::static_row_tag_t());
+ mutation_fragment_opt sr;
+ for (auto&& v : _snapshot->versions()) {
+ if (!v.partition().static_row().empty()) {
+ if (!sr) {
+ sr = mutation_fragment(static_row(v.partition().static_row()));
+ } else {
+ sr->as_static_row().apply(*_schema, v.partition().static_row());
+ }
+ }
+ }
+ return sr;
+ }
+
+ mutation_fragment_opt read_next() {
+ if (!_clustering_rows.empty()) {
+ auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
+ if (mf) {
+ return mf;
+ }
+
+ boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
+ clustering_row result = *_clustering_rows.back()._position;
+ pop_clustering_row();
+ while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) {
+ boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
+ auto& current = _clustering_rows.back();
+ result.apply(*_schema, *current._position);
+ pop_clustering_row();
+ }
+ _last_entry = result.position();
+ return mutation_fragment(std::move(result));
+ }
+ return _range_tombstones.get_next();
+ }
+
+ void do_fill_buffer() {
+ if (!_last_entry) {
+ auto mfopt = read_static_row();
+ if (mfopt) {
+ _buffer.emplace_back(std::move(*mfopt));
+ }
+ }
+
+ if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
+ refresh_iterators();
+ _reclaim_counter = _lsa_region.reclaim_counter();
+ _version_count = _snapshot->version_count();
+ }
+
+ while (!is_end_of_stream() && !is_buffer_full()) {
+ if (_in_ck_range && _clustering_rows.empty()) {
+ _in_ck_range = false;
+ _current_ck_range = std::next(_current_ck_range);
+ refresh_iterators();
+ continue;
+ }
+
+ auto mfopt = read_next();
+ if (mfopt) {
+ _buffer.emplace_back(std::move(*mfopt));
+ } else {
+ _end_of_stream = true;
+ }
+ }
+ }
+
+ static tombstone tomb(partition_snapshot& snp) {
+ tombstone t;
+ for (auto& v : snp.versions()) {
+ t.apply(v.partition().partition_tombstone());
+ }
+ return t;
+ }
public:
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
- boost::any pointer_to_container);
- ~partition_snapshot_reader();
- virtual future<> fill_buffer() override;
+ boost::any pointer_to_container)
+ : streamed_mutation::impl(s, std::move(dk), tomb(*snp))
+ , _container_guard(std::move(pointer_to_container))
+ , _ck_ranges(std::move(crr))
+ , _current_ck_range(_ck_ranges.begin())
+ , _ck_range_end(_ck_ranges.end())
+ , _cmp(*s)
+ , _eq(*s)
+ , _snapshot(snp)
+ , _range_tombstones(*s)
+ , _lsa_region(region)
+ , _read_section(read_section) {
+ for (auto&& v : _snapshot->versions()) {
+ _range_tombstones.apply(v.partition().row_tombstones());
+ }
+ do_fill_buffer();
+ }
+
+ ~partition_snapshot_reader() {
+ if (!_snapshot.owned()) {
+ return;
+ }
+ // If no one else is using this particular snapshot try to merge partition
+ // versions.
+ with_allocator(_lsa_region.allocator(), [this] {
+ return with_linearized_managed_bytes([this] {
+ try {
+ _read_section(_lsa_region, [this] {
+ _snapshot->merge_partition_versions();
+ _snapshot = {};
+ });
+ } catch (...) { }
+ });
+ });
+ }
+
+ virtual future<> fill_buffer() override {
+ return _read_section(_lsa_region, [&] {
+ return with_linearized_managed_bytes([&] {
+ do_fill_buffer();
+ return make_ready_future<>();
+ });
+ });
+ }
};

-streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container);
+ logalloc::allocating_section& read_section, boost::any pointer_to_container)
+{
+ return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container));
+}
diff --git a/partition_version.cc b/partition_version.cc
index 0a92a14..e61acde 100644
--- a/partition_version.cc
+++ b/partition_version.cc
@@ -291,181 +291,3 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(schema_ptr entry_schema)
return snp;
}
}
-
-partition_snapshot_reader::partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
- lw_shared_ptr<partition_snapshot> snp,
- query::clustering_key_filter_ranges crr, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
- : streamed_mutation::impl(s, std::move(dk), tomb(*snp))
- , _container_guard(std::move(pointer_to_container))
- , _ck_ranges(std::move(crr))
- , _current_ck_range(_ck_ranges.begin())
- , _ck_range_end(_ck_ranges.end())
- , _cmp(*s)
- , _eq(*s)
- , _snapshot(snp)
- , _range_tombstones(*s)
- , _lsa_region(region)
- , _read_section(read_section)
-{
- for (auto&& v : _snapshot->versions()) {
- _range_tombstones.apply(v.partition().row_tombstones());
- }
- do_fill_buffer();
-}
-
-partition_snapshot_reader::~partition_snapshot_reader()
-{
- if (!_snapshot.owned()) {
- return;
- }
- // If no one else is using this particular snapshot try to merge partition
- // versions.
- with_allocator(_lsa_region.allocator(), [this] {
- return with_linearized_managed_bytes([this] {
- try {
- _read_section(_lsa_region, [this] {
- _snapshot->merge_partition_versions();
- _snapshot = {};
- });
- } catch (...) { }
- });
- });
-}
-
-tombstone partition_snapshot_reader::tomb(partition_snapshot& snp)
-{
- tombstone t;
- for (auto& v : snp.versions()) {
- t.apply(v.partition().partition_tombstone());
- }
- return t;
-}
-
-mutation_fragment_opt partition_snapshot_reader::read_static_row()
-{
- _last_entry = position_in_partition(position_in_partition::static_row_tag_t());
- mutation_fragment_opt sr;
- for (auto&& v : _snapshot->versions()) {
- if (!v.partition().static_row().empty()) {
- if (!sr) {
- sr = mutation_fragment(static_row(v.partition().static_row()));
- } else {
- sr->as_static_row().apply(*_schema, v.partition().static_row());
- }
- }
- }
- return sr;
-}
-
-void partition_snapshot_reader::refresh_iterators()
-{
- _clustering_rows.clear();
-
- if (!_in_ck_range && _current_ck_range == _ck_range_end) {
- return;
- }
-
- for (auto&& v : _snapshot->versions()) {
- auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range);
- auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
- if (_in_ck_range) {
- return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp);
- } else {
- return v.partition().lower_bound(*_schema, *_current_ck_range);
- }
- }();
-
- if (cr != cr_end) {
- _clustering_rows.emplace_back(rows_position { cr, cr_end });
- }
- }
-
- _in_ck_range = true;
- boost::range::make_heap(_clustering_rows, heap_compare(_cmp));
-}
-
-void partition_snapshot_reader::pop_clustering_row()
-{
- auto& current = _clustering_rows.back();
- current._position = std::next(current._position);
- if (current._position == current._end) {
- _clustering_rows.pop_back();
- } else {
- boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
- }
-}
-
-mutation_fragment_opt partition_snapshot_reader::read_next()
-{
- if (!_clustering_rows.empty()) {
- auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
- if (mf) {
- return mf;
- }
-
- boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
- clustering_row result = *_clustering_rows.back()._position;
- pop_clustering_row();
- while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) {
- boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
- auto& current = _clustering_rows.back();
- result.apply(*_schema, *current._position);
- pop_clustering_row();
- }
- _last_entry = result.position();
- return mutation_fragment(std::move(result));
- }
- return _range_tombstones.get_next();
-}
-
-void partition_snapshot_reader::do_fill_buffer()
-{
- if (!_last_entry) {
- auto mfopt = read_static_row();
- if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
- }
- }
-
- if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
- refresh_iterators();
- _reclaim_counter = _lsa_region.reclaim_counter();
- _version_count = _snapshot->version_count();
- }
-
- while (!is_end_of_stream() && !is_buffer_full()) {
- if (_in_ck_range && _clustering_rows.empty()) {
- _in_ck_range = false;
- _current_ck_range = std::next(_current_ck_range);
- refresh_iterators();
- continue;
- }
-
- auto mfopt = read_next();
- if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
- } else {
- _end_of_stream = true;
- }
- }
-}
-
-future<> partition_snapshot_reader::fill_buffer()
-{
- return _read_section(_lsa_region, [&] {
- return with_linearized_managed_bytes([&] {
- do_fill_buffer();
- return make_ready_future<>();
- });
- });
-}
-
-streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
- query::clustering_key_filter_ranges crr,
- lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
- logalloc::allocating_section& read_section, boost::any pointer_to_container)
-{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
- snp, std::move(crr), region, read_section, std::move(pointer_to_container));
-}
--
2.5.5

Glauber Costa

<glauber@scylladb.com>
unread,
Sep 28, 2016, 9:05:38 PM9/28/16
to scylladb-dev@googlegroups.com, Glauber Costa
By default, we don't do any accounting. By specializing this class and providing
an accounter class, we can account how much memory are we reading as we read
through the elements.

Signed-off-by: Glauber Costa <gla...@scylladb.com>
---
partition_version.hh | 40 +++++++++++++++++++++++++++++++++++-----
1 file changed, 35 insertions(+), 5 deletions(-)

diff --git a/partition_version.hh b/partition_version.hh
index 2722df6..418a0e1 100644
--- a/partition_version.hh
+++ b/partition_version.hh
@@ -271,7 +271,15 @@ inline partition_version_ref& partition_snapshot::version()
}
}

-class partition_snapshot_reader : public streamed_mutation::impl {
+struct partition_snapshot_reader_dummy_accounter {
+ void operator()(const clustering_row& cr) {}
+ void operator()(const static_row& sr) {}
+ void operator()(const range_tombstone& rt) {}
+};
+extern partition_snapshot_reader_dummy_accounter no_accounter;
+
+template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
+class partition_snapshot_reader : public streamed_mutation::impl, public MemoryAccounter {
struct rows_position {
mutation_partition::rows_type::const_iterator _position;
mutation_partition::rows_type::const_iterator _end;
@@ -308,6 +316,10 @@ class partition_snapshot_reader : public streamed_mutation::impl {
logalloc::region& _lsa_region;
logalloc::allocating_section& _read_section;

+ MemoryAccounter& mem_accounter() {
+ return *this;
+ }
+
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
@@ -384,11 +396,16 @@ class partition_snapshot_reader : public streamed_mutation::impl {
return _range_tombstones.get_next();
}

+ void emplace_mutation_fragment(mutation_fragment&& mfopt) {
+ mfopt.visit(mem_accounter());
+ push_mutation_fragment(std::move(mfopt));
+ }
+
void do_fill_buffer() {
if (!_last_entry) {
auto mfopt = read_static_row();
if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
+ emplace_mutation_fragment(std::move(*mfopt));
}
}

@@ -408,7 +425,7 @@ class partition_snapshot_reader : public streamed_mutation::impl {

auto mfopt = read_next();
if (mfopt) {
- _buffer.emplace_back(std::move(*mfopt));
+ emplace_mutation_fragment(std::move(*mfopt));
} else {
_end_of_stream = true;
}
@@ -423,11 +440,13 @@ class partition_snapshot_reader : public streamed_mutation::impl {
return t;
}
public:
+ template <typename... Args>
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
- boost::any pointer_to_container)
+ boost::any pointer_to_container, Args&&... args)
: streamed_mutation::impl(s, std::move(dk), tomb(*snp))
+ , MemoryAccounter(std::forward<Args>(args)...)
, _container_guard(std::move(pointer_to_container))
, _ck_ranges(std::move(crr))
, _current_ck_range(_ck_ranges.begin())
@@ -472,12 +491,23 @@ class partition_snapshot_reader : public streamed_mutation::impl {
}
};

+template <typename MemoryAccounter, typename... Args>
+inline streamed_mutation
+make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
+ query::clustering_key_filter_ranges crr,
+ lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
+ logalloc::allocating_section& read_section, boost::any pointer_to_container, Args&&... args)
+{
+ return make_streamed_mutation<partition_snapshot_reader<MemoryAccounter>>(s, std::move(dk),
+ snp, std::move(crr), region, read_section, std::move(pointer_to_container), std::forward<Args>(args)...);
+}
+
inline streamed_mutation
make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
logalloc::allocating_section& read_section, boost::any pointer_to_container)
{
- return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
+ return make_streamed_mutation<partition_snapshot_reader<>>(s, std::move(dk),
snp, std::move(crr), region, read_section, std::move(pointer_to_container));
}
--
2.5.5

It is loading more messages.
0 new messages