[RFC v4 1/4] flashcache: copy memcached sources

75 views
Skip to first unread message

Paweł Dziepak

<pdziepak@quarnos.org>
unread,
May 7, 2015, 12:21:22 PM5/7/15
to seastar-dev@googlegroups.com, Paweł Dziepak
Signed-off-by: Paweł Dziepak <pdzi...@quarnos.org>
---
apps/{memcached => flashcache}/ascii.rl | 0
apps/{memcached/memcache.cc => flashcache/flashcache.cc} | 2 +-
apps/{memcached/memcached.hh => flashcache/flashcache.hh} | 0
configure.py | 2 ++
4 files changed, 3 insertions(+), 1 deletion(-)
copy apps/{memcached => flashcache}/ascii.rl (100%)
copy apps/{memcached/memcache.cc => flashcache/flashcache.cc} (99%)
copy apps/{memcached/memcached.hh => flashcache/flashcache.hh} (100%)

diff --git a/apps/memcached/ascii.rl b/apps/flashcache/ascii.rl
similarity index 100%
copy from apps/memcached/ascii.rl
copy to apps/flashcache/ascii.rl
diff --git a/apps/memcached/memcache.cc b/apps/flashcache/flashcache.cc
similarity index 99%
copy from apps/memcached/memcache.cc
copy to apps/flashcache/flashcache.cc
index d9f1b1a..7afb29d 100644
--- a/apps/memcached/memcache.cc
+++ b/apps/flashcache/flashcache.cc
@@ -41,7 +41,7 @@
#include "net/api.hh"
#include "net/packet-data-source.hh"
#include "apps/memcached/ascii.hh"
-#include "memcached.hh"
+#include "flashcache.hh"
#include <unistd.h>

#define PLATFORM "seastar"
diff --git a/apps/memcached/memcached.hh b/apps/flashcache/flashcache.hh
similarity index 100%
copy from apps/memcached/memcached.hh
copy to apps/flashcache/flashcache.hh
diff --git a/configure.py b/configure.py
index ccae7c4..2d1444d 100755
--- a/configure.py
+++ b/configure.py
@@ -152,6 +152,7 @@ apps = [
'apps/seawreck/seawreck',
'apps/seastar/seastar',
'apps/memcached/memcached',
+ 'apps/flashcache/flashcache',
]

all_artifacts = apps + tests + ['libseastar.a', 'seastar.pc']
@@ -273,6 +274,7 @@ deps = {
'tests/test-reactor': ['tests/test-reactor.cc'] + core,
'apps/httpd/httpd': ['apps/httpd/demo.json', 'apps/httpd/main.cc'] + http + libnet + core,
'apps/memcached/memcached': ['apps/memcached/memcache.cc'] + memcache_base,
+ 'apps/flashcache/flashcache': ['apps/flashcache/flashcache.cc'] + memcache_base,
'tests/memcached/test_ascii_parser': ['tests/memcached/test_ascii_parser.cc'] + memcache_base + boost_test_lib,
'tests/fileiotest': ['tests/fileiotest.cc'] + core,
'tests/directory_test': ['tests/directory_test.cc'] + core,
--
2.1.4

Paweł Dziepak

<pdziepak@quarnos.org>
unread,
May 7, 2015, 12:21:22 PM5/7/15
to seastar-dev@googlegroups.com, Paweł Dziepak
Signed-off-by: Paweł Dziepak <pdzi...@quarnos.org>
---
core/{sleep.hh => condvar.hh} | 53 +++++++++++++++++++++++++++----------------
1 file changed, 33 insertions(+), 20 deletions(-)
copy core/{sleep.hh => condvar.hh} (50%)

diff --git a/core/sleep.hh b/core/condvar.hh
similarity index 50%
copy from core/sleep.hh
copy to core/condvar.hh
index 24b3f54..034d70e 100644
--- a/core/sleep.hh
+++ b/core/condvar.hh
@@ -15,30 +15,43 @@
* specific language governing permissions and limitations
* under the License.
*/
-
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
+#ifndef CONDVAR_HH_
+#define CONDVAR_HH_

-#pragma once
-
-#include <chrono>
-#include <functional>
-
-#include "core/shared_ptr.hh"
#include "core/reactor.hh"
-#include "core/future.hh"
+#include "core/future-util.hh"
+#include "core/circular_buffer.hh"

-template <typename Clock = std::chrono::high_resolution_clock, typename Rep, typename Period>
-future<> sleep(std::chrono::duration<Rep, Period> dur) {
- struct sleeper {
- promise<> done;
- timer<Clock> tmr;
- sleeper(std::chrono::duration<Rep, Period> dur)
- : tmr([this] { done.set_value(); delete this; })
- {
- tmr.arm(dur);
+class condition_variable {
+public:
+ future<> wait() {
+ _wait_list.emplace_back();
+ return _wait_list.back().get_future();
+ }
+ template<typename Predicate>
+ future<> wait(Predicate&& pred) {
+ if (pred()) {
+ return make_ready_future<>();
}
- };
- return (new sleeper(dur))->done.get_future();
-}
+ return wait().then([this, pred = std::forward<Predicate>(pred)] {
+ return wait(std::move(pred));
+ });
+ }
+ void notify_one() {
+ auto& p = _wait_list.front();
+ p.set_value();
+ _wait_list.pop_front();
+ }
+ void notify_all() {
+ while (!_wait_list.empty()) {
+ notify_one();
+ }
+ }
+private:
+ circular_buffer<promise<>> _wait_list;
+};
+
+#endif
--
2.1.4

Paweł Dziepak

<pdziepak@quarnos.org>
unread,
May 7, 2015, 12:21:22 PM5/7/15
to seastar-dev@googlegroups.com, Paweł Dziepak
Hello,
This is the fourth version of the flashcache patchset, hopefully the last one
without the support of on disk storage. Most of the changes in v4 were made to
improve performance. This includes changing the way per-object metadata is
stored. In the previous versions object data was directly preceeded by an
instance of log_alloc::metadata class which kept all the information needed
by the log-structured allocator. Now, this class is gone. Reference counting is
done on per segment basis and the rest of the metadata is stored directly inside
the owner object using log_alloc_ptr objects as hooks. In order to allow
compactor to iterate through objects in a block each segments contains an
intrusive list of log_alloc_ptr instances representing the objects which first
byte is in that segments.

Below are summaries of the performance tests I did. Like before the tests were
run on two c4.8xlarge EC2 instances, but this time native network stack and DPDK
was used. The server is running in a single core mode. Unless stated otherwise
the segment size is 16kB, eviction step 10%. Server was restared between each
test (except the last test case). All tests were run three times for 60 seconds,
all results are medians of these three runs (based on net rate). Raw data
(not very readable) is available here:
https://gist.github.com/pdziepak/8d12676606e7a7b2e1f5

memaslap defaults: get:set ratio 9:1, key size 64, value size 1024
(virtually) unlimited memory, no get misses
single threaded client multi (32) threaded client
memcached flashcache diff memcached flashcache diff
Net rate: 287.5M/s 286.1M/s -0.4% 312.7M/s 302.6M/s -3.2%
TPS: 256920 255681 279449 270425

get:set ratio 1:1, key size 64, value size 1024
(virtually) unlimited memory, no get misses
single threaded client multi (32) threaded client
memcached flashcache diff memcached flashcache diff
Net rate: 263.0M/s 261.9M/s -0.4% 291.1M/s 277.6M/s -4.6%
TPS: 240572 239503 266178 253941

get:set ratio 1:1, key size 64, value size 1024
object memory limited to 128MB, segment size 16kB, eviction step 10%
single threaded client
memcached flashcache diff
Net rate: 153.9M/s 155.2M/s +0.8%
TPS: 243198 248976 +2.4%
Misses: 6377731 6661956 +4.5%

get:set ratio 1:1, key size 64, value size 1024
object memory limited to 128MB, segment size 16kB, eviction step 20%
single threaded client
memcached flashcache diff
Net rate: 153.9M/s 149.1M/s -3.1%
TPS: 243198 240951 -0.9%
Misses: 6377731 6513303 +2.1%

get:set ratio 1:1, key size 64, value size 1024
object memory limited to 128MB, segment size 16kB, eviction step 5%
single threaded client
memcached flashcache diff
Net rate: 153.9M/s 149.8M/s -2.7%
TPS: 243198 238915 -1.8%
Misses: 6377731 6347843 -0.5%

get:set ratio 1:1, key size 64, value size 1024
object memory limited to 128MB, segment size 4kB, eviction step 5%
single threaded client
memcached flashcache diff
Net rate: 153.9M/s 142.6M/s -7.3%
TPS: 243198 227205 -6.6%
Misses: 6377731 6028231 -5.5%

two consecutive (without server restart) runs, both with get:set ratio 1:1 and
key size 64 but the value size in the first on was 128 and in the second 1024
object memory limited to 128MB, segment size 16kB, eviction step 10%
single threaded client
memcached 128 memcached 1024 flashcache 128 flashcache 1024
Net rate: 44.0M/s 129.5M/s 41.0M/s 156.0M/s
TPS: 245946 228162 238356 250367
Misses: 4400901 6834405 4748966 6703696

changes in v4:
- log_alloc::metadata replaced by log_alloc_ptr hook
- reference counting moved from objects to segments
- condition_wait replaced by condition variable implementation
- fixed statistics collaction in flashcache
changes in v3:
- blocks with less than a segment of unused space can be compacted
- iovecs abstraction replaced by log_alloc::object_view, no std::vector<> in
log_alloc implementation
- log_alloc::object class introduced
- fixed eviction target computation in flashcache
- log_alloc::alloc() takes continuation as an argument instead of using
standard alloc().then() solution in order to prevent race condition with
update_reference() calls in compaction
- removed metadata::removed flag, no metadata::_owner indicates removed
object
- segment allocator doesn't need upper bound on the number of segments anymore
changes in v2:
- added special case for objects which size is around a multiply of a segment
size
- evictor takes into account space wasted inside blocks
- flush all in flashache should now work correctly regardless of available
amount of luck
- added some comments, code reorganized a bit to improve readability
- concept of incomplete object dropped
- introduced iovecs class for handling allocated buffers
- segment_allocator no longer allocates segments in slabs
- different way of freeing empty segments

Paweł Dziepak (4):
flashcache: copy memcached sources
core: add condition variable implementation
core: add log structured memory allocator
flashcache: use log-structured allocator

apps/{memcached => flashcache}/ascii.rl | 0
.../memcache.cc => flashcache/flashcache.cc} | 864 +++++++++++-----
.../memcached.hh => flashcache/flashcache.hh} | 0
configure.py | 2 +
core/{sleep.hh => condvar.hh} | 53 +-
core/log_alloc.hh | 1043 ++++++++++++++++++++
6 files changed, 1677 insertions(+), 285 deletions(-)
copy apps/{memcached => flashcache}/ascii.rl (100%)
copy apps/{memcached/memcache.cc => flashcache/flashcache.cc} (63%)
copy apps/{memcached/memcached.hh => flashcache/flashcache.hh} (100%)
copy core/{sleep.hh => condvar.hh} (50%)
create mode 100644 core/log_alloc.hh

--
2.1.4

Paweł Dziepak

<pdziepak@quarnos.org>
unread,
May 7, 2015, 12:21:25 PM5/7/15
to seastar-dev@googlegroups.com, Paweł Dziepak
Signed-off-by: Paweł Dziepak <pdzi...@quarnos.org>
---
core/log_alloc.hh | 1043 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 1043 insertions(+)
create mode 100644 core/log_alloc.hh

diff --git a/core/log_alloc.hh b/core/log_alloc.hh
new file mode 100644
index 0000000..d441fd1
--- /dev/null
+++ b/core/log_alloc.hh
@@ -0,0 +1,1043 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2015 Cloudius Systems, Ltd.
+ */
+#ifndef LOG_ALLOC_HH
+#define LOG_ALLOC_HH
+
+#include <cassert>
+#include <memory>
+#include <ratio>
+#include <vector>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/set.hpp>
+
+#include "core/prefetch.hh"
+
+#include "core/align.hh"
+#include "core/reactor.hh"
+#include "core/memory.hh"
+#include "core/future-util.hh"
+#include "core/condvar.hh"
+
+// Segment allocator
+//
+// This is a segment provider for the log-structured allocator and fix_alloc.
+// Its purpose is to interface with the system allocator (including reclaim
+// hook), provide segment accounting, an ability to reserve segments which
+// won't be allocated unless the allocator is in the emergency mode and
+// an ability to temporary disable all allocations.
+// TotalSegments is the maximum number of segments that may be allocated, if
+// it's set to zero then no such limit is enforced.
+template<size_t SegmentSize, size_t TotalSegments = 0>
+class segment_allocator {
+private:
+ struct segment : boost::intrusive::list_base_hook<> { };
+public:
+ typedef std::conditional_t<(SegmentSize > std::numeric_limits<uint16_t>::max()),
+ uint32_t, uint16_t> size_type;
+ static_assert(SegmentSize <= std::numeric_limits<uint32_t>::max(),
+ "segment size is too big");
+ constexpr static size_type segment_size = SegmentSize;
+
+ segment_allocator() : _enabled(true), _total_count(0), _used_count(0),
+ _reserved_count(0), _emergency(0) {
+ }
+ ~segment_allocator() {
+ _reserved_count = 0;
+ free_memory();
+ }
+
+ void* alloc() {
+ while (!unused() && !(_emergency && _total_count - _used_count > 0)) {
+ if (!alloc_segment()) {
+ return nullptr;
+ }
+ }
+ auto& seg = _list.front();
+ _list.pop_front();
+ seg.~segment();
+ _used_count++;
+ return &seg;
+ }
+ void free(void* ptr) {
+ auto& seg = *new (ptr) segment;
+ _list.push_front(seg);
+ _used_count--;
+ }
+ size_t prealloc(size_t segments) {
+ if (segments <= unused()) {
+ return unused();
+ }
+ while (unused() < segments) {
+ auto ret = alloc_segment();
+ if (!ret) {
+ break;
+ }
+ }
+ return unused();
+ }
+ bool reserve(size_t segments) {
+ // reserved segments will be available only if _emergency > 0
+ while (unused() < segments) {
+ auto ret = alloc_segment();
+ if (!ret) {
+ return false;
+ }
+ }
+ _reserved_count += segments;
+ return true;
+ }
+
+ template<typename Cont>
+ std::result_of_t<Cont()> when_ready(Cont&& cont) {
+ if (_enabled) {
+ return cont();
+ }
+ _wait_list.emplace_back();
+ auto f = _wait_list.back().get_future();
+ return std::move(f).then([this, cont = std::forward<Cont>(cont)] () mutable {
+ return when_ready(std::move(cont));
+ });
+ }
+ void enable() {
+ _enabled = true;
+ while (!_wait_list.empty()) {
+ auto& p = _wait_list.back();
+ p.set_value();
+ _wait_list.pop_back();
+ }
+ }
+ void disable() {
+ _enabled = false;
+ }
+
+ size_t total() const { return _total_count; }
+ size_t used() const {
+ return std::min(_used_count + _reserved_count, _total_count);
+ }
+ size_t unused() const {
+ return _total_count - used();
+ }
+ size_t reserved_deficit() const {
+ if (_total_count - _used_count >= _reserved_count) {
+ return 0;
+ }
+ return _used_count + _reserved_count - _total_count;
+ }
+
+ void enter_emergency() { _emergency++; }
+ void exit_emergency() { _emergency--; }
+
+ void free_memory() {
+ if (!_enabled) {
+ return;
+ }
+ while (unused()) {
+ assert(!_list.empty());
+ auto& seg = _list.back();
+ _list.pop_back();
+ _total_count--;
+ seg.~segment();
+ auto ptr = reinterpret_cast<char*>(&seg);
+ delete[] ptr;
+ }
+ }
+private:
+ bool alloc_segment() {
+ if (TotalSegments && _total_count >= TotalSegments) {
+ return false;
+ }
+ char* ptr;
+ try {
+ ptr = new (with_alignment(SegmentSize)) char[SegmentSize];
+ auto addr = reinterpret_cast<uintptr_t>(ptr);
+ assert(!(addr & (SegmentSize - 1)));
+ } catch (const std::bad_alloc&) {
+ return false;
+ }
+ auto& seg = *new (ptr) segment;
+ _list.push_back(seg);
+ _total_count++;
+ return true;
+ }
+private:
+ boost::intrusive::list<segment> _list;
+ //semaphore _enabled;
+ bool _enabled;
+ std::vector<promise<>> _wait_list;
+ size_t _total_count;
+ size_t _used_count;
+ size_t _reserved_count;
+ unsigned _emergency;
+};
+
+// Log-structured memory allocator
+//
+// This is the main logic of the log-structured memory allocator.
+// Allocated objects are appended in a log-like manner to the current block.
+// Blocks are made of at least one continuous memory segment, blocks themselves
+// do not need to be continuous. Each segment has the list of objects that
+// it contains.
+//
+// Addition:
+// - if there is no current working block a new segment is allocated and
+// log_alloc::block created at the beginning of usable space and the object
+// is appended directly after the last meaningful data inside the block
+// (either another object or per-block metadata), the information about
+// object is also added to the list of objects in the segment
+//
+// Removal:
+// - the object is removed from the list of objects in the segment
+// - the accounting of unused data (both global, per-segment and per-block)
+// is updated and then the block is scanned and empty and unlocked segments
+// are freed
+//
+// Locking objects:
+// - object may need to be locked if it is going to be used and it must be
+// ensured that its neither removed nor moved somewhere else during compaction
+// - locking is done on per-segment basis, so each time an object is locked
+// a reference counts of all segments that keep its data is increased
+// - when an object is unlocked, it is checked whether it was removed
+// in the meantime if it was, the removal is completed
+//
+// Reclaiming segments:
+// - evictor is called and expected to free sufficient amount of memory
+// - blocks are compacted starting from the one with most wasted space,
+// during compaction the surviving objects are moved to the new block and
+// their references are updated, during compaction the segment allocator
+// is in the emergency mode so that segments from the reserved pool may be
+// used
+// - the compaction stops if either the sufficent number of segments was
+// reclaimed or it cannot proceed, assuming that eviction freed enough memory
+// the compaction won't be able to proceed only if some segments couldn't be
+// freed because of locked objects, if that's the case the reclaimer waits
+// for them to be unlocked, otherwise the evictor is called again
+//
+// There is also a special case handling of objects which size is roughly a
+// multiply of a segment size. In such case there is no point in appending them
+// to the current block, instead just the appropriate amount of segments is
+// allocated and returned. Since such block contains only a single object
+// there is no need for compaction and all its segments can be freed immediately
+// once the object is freed (and unlocked). There is also no need for per-block
+// metadata.
+
+// Per-object metadata, supposed to be kept by an owner of the object
+// allocated using log_alloc.
+struct log_alloc_ptr : public boost::intrusive::list_base_hook<> {
+ void* data;
+ size_t size;
+
+ log_alloc_ptr() : data(nullptr), size(0) { }
+ operator bool() const {
+ return data;
+ }
+};
+
+template<typename SegmentAllocator, typename Evictor,
+ typename MemoryWasteTarget = std::ratio<1, 20>>
+class log_alloc {
+private:
+ using segment_size_type = typename SegmentAllocator::size_type;
+
+ struct block;
+
+ // Per-segment metadata
+ struct segment {
+ segment* next;
+ block* blk;
+ boost::intrusive::list<log_alloc_ptr,
+ boost::intrusive::constant_time_size<false>
+ > objects;
+ unsigned ref_count;
+ segment_size_type unused;
+
+ bool is_locked() const { return ref_count; }
+ void lock() { ref_count++; }
+ void unlock() { ref_count--; }
+
+ segment(block* b) : next(nullptr), blk(b), ref_count(0), unused(0) { }
+ void* at_offset(ptrdiff_t offset) {
+ return reinterpret_cast<char*>(this + 1) + offset;
+ }
+ ptrdiff_t position_in_segment(const void* ptr) const {
+ auto base = reinterpret_cast<const char*>(this + 1);
+ return static_cast<const char*>(ptr) - base;
+ }
+ static segment& from_object(const void* obj) {
+ auto ptr = reinterpret_cast<uintptr_t>(obj);
+ ptr &= ~uintptr_t(SegmentAllocator::segment_size - 1);
+ return *reinterpret_cast<segment*>(ptr);
+ }
+ };
+
+ static_assert(SegmentAllocator::segment_size > sizeof(segment), "usable segment size is too small");
+ constexpr static size_t usable_segment_size = SegmentAllocator::segment_size - sizeof(segment);
+
+ static_assert(std::ratio_greater<MemoryWasteTarget, std::ratio<0, 1>>::value, "memory waste target below or equal 0%");
+ static_assert(std::ratio_less_equal<MemoryWasteTarget, std::ratio<1, 1>>::value, " memory waste target above 100%");
+
+ // The inverse of MemoryWasteTarget is the size in segments of the
+ // smallest object that will qualify for single-object-block special
+ // case. Similarly any working blocks that reaches the size of at least
+ // MemoryWasteTarget^(-1) segments will be closed. These two properties
+ // guarantee that the biggest block that the compactor may encounter
+ // contains MemoryWasteTarget^(-1) * 2 - 2 segments. That would be the
+ // case when to a the biggest possible working block that still isn't
+ // closed is appended the largest object that didn't qualify for the
+ // special case.
+ constexpr static const size_t max_segments_in_block = (MemoryWasteTarget::den / MemoryWasteTarget::num) * 2 - 2;
+
+public:
+ class object_view {
+ protected:
+ void* _pointer;
+ size_t _size;
+ public:
+ class iterator {
+ private:
+ segment* _segment;
+ size_t _position;
+ size_t _size;
+ iovec _current;
+ public:
+ iterator() : _segment(nullptr), _position(0) { }
+ iterator(void* pointer, size_t size) : _segment(&segment::from_object(pointer)),
+ _position(_segment->position_in_segment(pointer)), _size(size) {
+ update_current();
+ }
+ iterator& operator++() {
+ auto left = _size - std::min(_size, usable_segment_size - _position);
+ _position = 0;
+ if (left) {
+ _size = left;
+ _segment = _segment->next;
+ update_current();
+ } else {
+ _segment = nullptr;
+ }
+ return *this;
+ }
+ iterator operator++(int) {
+ auto it = *this;
+ operator++();
+ return it;
+ }
+ const iovec& operator*() const {
+ return _current;
+ }
+ const iovec* operator->() const {
+ return &_current;
+ }
+ bool operator==(const iterator& other) const {
+ return _segment == other._segment && _position == other._position;
+ }
+ bool operator!=(const iterator& other) const {
+ return !(*this == other);
+ }
+ private:
+ void update_current() {
+ assert(_segment);
+ auto size = std::min(_size, usable_segment_size - _position);
+ _current = iovec { _segment->at_offset(_position), size };
+ }
+ };
+ public:
+ object_view() : _pointer(nullptr), _size(0) { }
+ object_view(const log_alloc_ptr& ptr) : _pointer(ptr.data), _size(ptr.size) { }
+ object_view(const object_view&) = default;
+ object_view(object_view&& other) : _pointer(other._pointer), _size(other._size) {
+ other._pointer = nullptr;
+ }
+ object_view& operator=(const object_view&) = default;
+ object_view& operator=(object_view&& other) {
+ _pointer = other._pointer;
+ _size = other._size;
+ other._pointer = nullptr;
+ return *this;
+ }
+
+ iterator begin() const { return iterator(_pointer, _size); }
+ iterator end() const { return iterator(); }
+
+ void* get_ptr() const { return _pointer; }
+
+ operator bool() const { return _pointer; }
+ size_t size() const { return _size; }
+
+ size_t write(size_t offset, const void* buffer, size_t length) {
+ auto ptr = static_cast<const char*>(buffer);
+ return iterate(offset, length, [ptr] (size_t pos, void* base, size_t size) {
+ ::memcpy(base, ptr + pos, size);
+ });
+ }
+ size_t read(size_t offset, void* buffer, size_t length) {
+ auto ptr = static_cast<char*>(buffer);
+ return iterate(offset, length, [ptr] (size_t pos, void* base, size_t size) {
+ ::memcpy(ptr + pos, base, size);
+ });
+ }
+ bool equal(const void* buffer, size_t length) {
+ auto ptr = static_cast<const char*>(buffer);
+ int result = 0;
+ iterate(length, [ptr, &result] (size_t pos, void* base, size_t size) {
+ if (!result) {
+ result = ::memcmp(ptr + pos, base, size);
+ }
+ });
+ return !result;
+ }
+ static void copy(const object_view& src, object_view& dst) {
+ iterate_all(src, dst, [] (void* src, void* dst, size_t size) {
+ ::memcpy(dst, src, size);
+ });
+ }
+ static bool equal(const object_view& x, const object_view& y) {
+ int result = 0;
+ iterate_all(x, y, [&result] (void* x, void* y, size_t size) {
+ if (!result) {
+ result = ::memcmp(x, y, size);
+ }
+ });
+ return !result;
+ }
+ private:
+ template<typename Func>
+ size_t iterate(size_t offset, size_t length, Func&& func) {
+ assert(_pointer);
+ size_t position = 0;
+ size_t segment_offset = 0;
+ auto it = begin();
+ for (; it != end(); ++it) {
+ position += it->iov_len;
+ if (position >= offset) {
+ segment_offset = it->iov_len + offset - position;
+ break;
+ }
+ }
+ auto done = std::min(it->iov_len - segment_offset, length);
+ func(0, static_cast<char*>(it->iov_base) + segment_offset, done);
+ ++it;
+ while (done < length && it != end()) {
+ auto size = std::min(it->iov_len, length - done);
+ func(done, it->iov_base, size);
+ done += size;
+ ++it;
+ }
+ return done;
+ }
+ template<typename Func>
+ size_t iterate(size_t length, Func&& func) {
+ assert(_pointer);
+ auto it = begin();
+ size_t done = 0;
+ while (length && it != end()) {
+ auto size = std::min(it->iov_len, length);
+ func(done, it->iov_base, size);
+ length -= size;
+ done += size;
+ ++it;
+ }
+ return done;
+ }
+ template<typename Func>
+ static void iterate_all(const object_view& x, const object_view& y, Func&& func) {
+ auto xit = x.begin();
+ auto yit = y.begin();
+ size_t xoff = 0;
+ size_t yoff = 0;
+ int result = 0;
+ while (xit != x.end() && yit != y.end() && !result) {
+ auto xlen = xit->iov_len - xoff;
+ auto ylen = yit->iov_len - yoff;
+ auto size = std::min(xlen, ylen);
+ func(static_cast<char*>(xit->iov_base) + xoff,
+ static_cast<char*>(yit->iov_base) + yoff, size);
+ xoff += size;
+ yoff += size;
+ if (xlen <= ylen) {
+ xit++;
+ xoff = 0;
+ }
+ if (ylen <= xlen) {
+ yit++;
+ yoff = 0;
+ }
+ }
+ assert(xit == x.end() && yit == y.end());
+ }
+ };
+private:
+ // Per-block metadata
+ class block : public boost::intrusive::list_base_hook<> {
+ private:
+ constexpr static uint32_t compacted = 1u << 0;
+ constexpr static uint32_t dirty = 1u << 1;
+ uint32_t _unused;
+ unsigned _flags;
+ public:
+ block() : _unused(0), _flags(0) { }
+
+ bool operator<(const block& b) const {
+ return unused() > b.unused();
+ }
+
+ bool is_compacted() const { return _flags & compacted; }
+ void set_compacted() { _flags |= compacted; }
+
+ bool is_dirty() const { return _flags & dirty; }
+ void set_dirty() { _flags |= dirty; }
+
+ size_t unused() const {
+ return is_dirty() && !is_compacted() ? _unused : 0;
+ }
+ void add_unused(size_t unused) { _unused += unused; }
+ void remove_unused(size_t unused) {
+ assert(_unused >= unused);
+ _unused -= unused;
+ }
+
+ template<typename Func>
+ void for_each_object(Func&& func) {
+ auto seg = &segment::from_object(this);
+ while (seg) {
+ auto it = seg->objects.begin();
+ while (it != seg->objects.end()) {
+ auto next = std::next(it);
+ // func() is allowed to invalidate 'it'
+ func(*seg, *it);
+ it = next;
+ }
+ seg = seg->next;
+ }
+ }
+ };
+
+ class block_container {
+ private:
+ constexpr static size_t min_size = usable_segment_size / 8;
+ constexpr static size_t max_size = max_segments_in_block * usable_segment_size;
+
+ constexpr static size_t log2floor(size_t n) {
+ return std::numeric_limits<size_t>::digits - count_leading_zeros(n) - 1;
+ }
+ constexpr static size_t log2ceil(size_t n) {
+ return std::numeric_limits<size_t>::digits - count_leading_zeros(n - 1);
+ }
+ constexpr static size_t min_shift = log2floor(min_size);
+ constexpr static size_t max_shift = log2ceil(max_size);
+ constexpr static size_t buckets = max_shift - min_shift + 1;
+
+ constexpr static unsigned npos = -1;
+ public:
+ template<typename Func>
+ void update(block& blk, Func&& func) {
+ auto old_idx = index(get_key(blk));
+ func();
+ auto new_idx = index(get_key(blk));
+ if (old_idx == new_idx) {
+ return;
+ }
+ remove(old_idx, blk);
+ insert(new_idx, blk);
+ }
+ void insert(block& blk) {
+ return insert(index(get_key(blk)), blk);
+ }
+ void remove(block& blk) {
+ return remove(index(get_key(blk)), blk);
+ }
+ bool empty() const {
+ return _not_empty.none();
+ }
+ block& get_maximum() {
+ assert(_not_empty.any());
+ auto idx = bitsets::get_last_set(_not_empty);
+ return _lists[idx].front();
+ }
+ private:
+ void insert(unsigned idx, block& blk) {
+ if (idx == npos) {
+ return;
+ }
+ _lists[idx].push_back(blk);
+ _not_empty.set(idx);
+ }
+ void remove(unsigned idx, block& blk) {
+ if (idx == npos) {
+ return;
+ }
+ auto& list = _lists[idx];
+ list.erase(list.iterator_to(blk));
+ if (list.empty()) {
+ _not_empty.reset(idx);
+ }
+ }
+ static unsigned index(size_t value) {
+ assert(value < max_size);
+ if (!value) {
+ return npos;
+ }
+ auto idx = std::max(log2floor(value) + 1, size_t(min_shift)) - min_shift;
+ return idx;
+ }
+ static size_t get_key(block& blk) {
+ return blk.unused();
+ }
+ private:
+ std::bitset<buckets> _not_empty;
+ std::array<boost::intrusive::list<block,
+ boost::intrusive::constant_time_size<false>
+ >,
+ buckets> _lists;
+ };
+private:
+ block* _block;
+ segment* _current_segment;
+ segment_size_type _current_segment_end;
+ size_t _current_size;
+ size_t _unused_memory;
+ SegmentAllocator& _alloc;
+ Evictor& _evictor;
+ condition_variable _locked_segments;
+ block_container _blocks;
+public:
+ log_alloc(SegmentAllocator& alloc, Evictor& evict)
+ : _block(nullptr)
+ , _unused_memory(0)
+ , _alloc(alloc)
+ , _evictor(evict)
+ {
+ // Reserve segments that compactor may need to use.
+ if (!_alloc.reserve(max_segments_in_block)) {
+ throw std::bad_alloc();
+ }
+ }
+ size_t unused_memory() const { return _unused_memory; }
+ static unsigned segment_count(size_t size) {
+ return (size + usable_segment_size - 1) / usable_segment_size;
+ }
+ static log_alloc_ptr* extract_hook(log_alloc_ptr* ptr) {
+ return ptr;
+ }
+ template<typename T>
+ static log_alloc_ptr* extract_hook(const T& tuple) {
+ return std::get<log_alloc_ptr*>(tuple);
+ }
+ // get_hook() is supposed to return either an pointer to log_alloc_ptr,
+ // a pair or a tuple containing such pointer. If the returned pointer is
+ // nullptr the allocation will be aborted.
+ // cont() is a continuation that will be executed after the allocation is
+ // completed (even if get_hook() retunred nullptr). Call to get_hook(),
+ // allocation and execution of continuation happen attomicaly in order to
+ // prevent race conditions with compaction.
+ template<typename GetHook, typename Cont>
+ futurize_t<std::result_of_t<Cont(std::result_of_t<GetHook()>)>>
+ alloc(size_t size, GetHook&& get_hook, Cont&& cont) {
+ using futurator = futurize<std::result_of_t<Cont(std::result_of_t<GetHook()>)>>;
+ return _alloc.when_ready([this, size, get_hook = std::forward<GetHook>(get_hook),
+ cont = std::forward<Cont>(cont)] () mutable {
+ auto count = segment_count(size + sizeof(block));
+ auto pre = _alloc.prealloc(count);
+ if (pre >= count) {
+ auto ret = get_hook();
+ auto hook = extract_hook(ret);
+ if (hook) {
+ block_append(size, *hook);
+ }
+ return futurator::apply(std::forward<Cont>(cont), std::move(ret));
+ }
+ _alloc.disable();
+ return reclaim(count).then([this, size, get_hook = std::forward<GetHook>(get_hook),
+ cont = std::forward<Cont>(cont)] () mutable {
+ _alloc.enable();
+ return alloc<GetHook, Cont>(size, std::move(get_hook), std::move(cont));
+ });
+ });
+ }
+ void free(log_alloc_ptr& ptr) {
+ auto seg = &segment::from_object(ptr.data);
+ seg->objects.erase(seg->objects.iterator_to(ptr));
+ object_view view = ptr;
+ if (!seg->blk) {
+ for (auto&& vec : view) {
+ auto& seg = segment::from_object(vec.iov_base);
+ if (!seg.is_locked()) {
+ _alloc.free(&seg);
+ } else {
+ assert(!seg.unused);
+ add_unused<true>(seg, usable_segment_size);
+ }
+ }
+ } else {
+ auto& blk = *seg->blk;
+ if (&blk == _block) {
+ close_block();
+ }
+ _blocks.update(blk, [&] {
+ blk.set_dirty();
+ for (auto&& vec : view) {
+ auto& seg = segment::from_object(vec.iov_base);
+ add_unused(seg, vec.iov_len);
+ }
+ });
+ release_segments(blk);
+ }
+ _locked_segments.notify_all();
+ }
+
+ future<> reclaim(unsigned target, unsigned attempt = 0) {
+ assert(target > 0);
+ if (_alloc.unused() >= target) {
+ return make_ready_future<>();
+ }
+ while (true) {
+ auto deficit = (target - _alloc.unused()) * usable_segment_size;
+ _evictor.evict(deficit, ++attempt);
+ auto locked = compact_pass(target);
+ if (_alloc.unused() >= target) {
+ break;
+ }
+ // No other allocations should succeed when we try to reclaim
+ // segments, so it is safe to wait for the locked segments
+ // to be released.
+ if (_alloc.reserved_deficit() && locked >= _alloc.reserved_deficit()) {
+ return _locked_segments.wait([this, target] {
+ return !_alloc.reserved_deficit();
+ }).then([this, target, attempt] {
+ assert(!_alloc.reserved_deficit());
+ return reclaim(target, attempt);
+ });
+ } else if (_alloc.unused() + locked >= target) {
+ return _locked_segments.wait([this, target] {
+ return _alloc.unused() >= target;
+ });
+ }
+ }
+ return make_ready_future<>();
+ }
+ void lock_object(object_view obj) {
+ for (auto&& vec : obj) {
+ segment::from_object(vec.iov_base).lock();
+ }
+ }
+ void unlock_object(object_view obj) {
+ unsigned empty_segments = 0;
+ for (auto&& vec : obj) {
+ auto& seg = segment::from_object(vec.iov_base);
+ assert(seg.is_locked());
+ seg.unlock();
+ if (seg.is_locked()) {
+ continue;
+ }
+ if (seg.unused == usable_segment_size
+ || (seg.blk && &seg == &segment::from_object(seg.blk)
+ && seg.unused == usable_segment_size - sizeof(block))) {
+ empty_segments++;
+ }
+ }
+ if (!empty_segments) {
+ return;
+ }
+ auto& seg = segment::from_object(obj.get_ptr());
+ if (!seg.blk) {
+ for (auto&& vec : obj) {
+ auto& seg = segment::from_object(vec.iov_base);
+ assert(!seg.ref_count);
+ remove_unused(seg, usable_segment_size);
+ _alloc.free(&seg);
+ }
+ } else if (seg.blk != _block) {
+ auto& blk = *seg.blk;
+ release_segments(blk);
+ }
+ _locked_segments.notify_all();
+ }
+private:
+ object_view block_append(size_t size, log_alloc_ptr& hook) {
+ if (can_close_block<true>(size)) {
+ auto result = append_segments(nullptr, nullptr, size);
+ auto base = result.first->at_offset(0);
+ hook.data = base;
+ hook.size = size;
+ result.first->objects.push_back(hook);
+ return object_view(hook);
+ }
+
+ if (!_block) {
+ auto ptr = static_cast<char*>(_alloc.alloc());
+ assert(ptr);
+ _block = new (ptr + sizeof(segment)) block;
+ _current_segment = new (ptr) segment(_block);
+ _current_segment_end = sizeof(block);
+ _current_size = sizeof(block);
+ }
+ assert(_current_segment_end < usable_segment_size);
+
+ auto current = _current_segment->at_offset(_current_segment_end);
+ hook.data = current;
+ hook.size = size;
+ _current_segment->objects.push_back(hook);
+
+ auto left = usable_segment_size - _current_segment_end;
+ auto this_segment = std::min(left, size);
+ assert(this_segment);
+
+ _current_segment_end += this_segment;
+ _current_size += this_segment;
+
+ auto rest = size - this_segment;
+ if (rest > 0) {
+ auto result = append_segments(_block, _current_segment, rest);
+ _current_segment = result.second;
+ _current_segment_end = rest % usable_segment_size;
+ _current_size += rest;
+ }
+
+ if (can_close_block(_current_size)) {
+ close_block();
+ }
+ return object_view(hook);
+ }
+ unsigned compact_pass(unsigned target) {
+ size_t locked = 0;
+ while (_alloc.unused() < target && !_alloc.reserved_deficit()) {
+ if (_blocks.empty()) {
+ break;
+ }
+ auto& worst = _blocks.get_maximum();
+ assert(worst.unused());
+ _blocks.remove(worst);
+ worst.set_compacted();
+
+ _alloc.enter_emergency();
+ worst.for_each_object([this, &worst] (segment& seg, log_alloc_ptr& ptr) {
+ auto old_obj = object_view(ptr);
+
+ for (auto&& vec : old_obj) {
+ auto& seg = segment::from_object(vec.iov_base);
+ add_unused(seg, vec.iov_len);
+ }
+ seg.objects.erase(seg.objects.iterator_to(ptr));
+
+ auto new_obj = block_append(ptr.size, ptr);
+ object_view::copy(old_obj, new_obj);
+ });
+ _alloc.exit_emergency();
+ auto result = release_segments(worst);
+ locked += result;
+ }
+ return locked;
+ }
+ unsigned release_segments(block& blk) {
+ unsigned locked = 0;
+ auto first_seg = &segment::from_object(&blk);
+ auto prev = first_seg;
+ auto seg = first_seg->next;
+ _blocks.update(blk, [&] {
+ while (seg) {
+ auto next = seg->next;
+ if (seg->unused == usable_segment_size && !seg->is_locked()) {
+ prev->next = next;
+ remove_unused(*seg, usable_segment_size);
+ _alloc.free(seg);
+ } else {
+ locked++;
+ prev = seg;
+ }
+ seg = next;
+ }
+ });
+ if (!first_seg->next && first_seg->unused == usable_segment_size - sizeof(block)
+ && !first_seg->is_locked()) {
+ _blocks.remove(blk);
+ remove_unused(*first_seg, usable_segment_size - sizeof(block));
+ _alloc.free(first_seg);
+ assert(!locked);
+ return 0;
+ }
+ return locked + 1;
+ }
+ template<bool SingleObjectBlock = false>
+ void add_unused(segment& seg, size_t amount) {
+ if (!SingleObjectBlock) {
+ auto& blk = *seg.blk;
+ blk.add_unused(amount);
+ }
+ seg.unused += amount;
+ assert(seg.unused <= usable_segment_size);
+ _unused_memory += amount;
+ }
+ void remove_unused(segment& seg, size_t amount) {
+ auto& blk = *seg.blk;
+ blk.remove_unused(amount);
+ assert(seg.unused >= amount);
+ seg.unused -= amount;
+ assert(_unused_memory >= amount);
+ _unused_memory -= amount;
+ }
+ void close_block() {
+ auto left = usable_segment_size - _current_segment_end;
+ add_unused(*_current_segment, left);
+ _blocks.insert(*_block);
+ _block = nullptr;
+ }
+ template<bool SingleObjectBlock = false>
+ bool can_close_block(size_t size) {
+ auto total = segment_count(size) * usable_segment_size;
+ if (SingleObjectBlock) {
+ size = std::min(size + sizeof(block), total);
+ }
+ float target = float(MemoryWasteTarget::num) / MemoryWasteTarget::den;
+ return float(total - size) < target * total;
+ }
+ std::pair<segment*, segment*> append_segments(block* blk, segment* last, size_t size) {
+ segment* head = nullptr;
+ segment* tail = last;
+ assert(!last || !last->next);
+ while (size) {
+ auto ptr = _alloc.alloc();
+ assert(ptr);
+ auto seg = new (ptr) segment(blk);
+ if (!head) {
+ head = seg;
+ }
+ if (tail) {
+ tail->next = seg;
+ }
+ tail = seg;
+
+ size -= std::min(size_t(usable_segment_size), size);
+ }
+ return std::make_pair(head, tail);
+ }
+};
+
+// Fixed-size objects allocator
+//
+// This allocator can be used for fixed size objects. It is meant to be used
+// together with log-structured allocator and acquire segments from the same
+// segment allocator.
+template<typename ObjectType, typename SegmentAllocator>
+class fixed_alloc {
+private:
+ constexpr static size_t object_size = sizeof(ObjectType);
+ constexpr static size_t segment_size = SegmentAllocator::segment_size;
+
+ struct free_object {
+ free_object* next;
+ free_object* successor() {
+ auto ptr = reinterpret_cast<char*>(this);
+ return reinterpret_cast<free_object*>(ptr + object_size);
+ }
+ };
+ static_assert(object_size >= sizeof(free_object), "object size is too small");
+
+ struct block : boost::intrusive::list_base_hook<> {
+ unsigned used;
+ free_object* next;
+ static block& from_object(void* ptr) {
+ auto addr = reinterpret_cast<uintptr_t>(ptr);
+ return *reinterpret_cast<block*>(addr & ~(segment_size - 1));
+ }
+ };
+
+ constexpr static size_t metadata_size = sizeof(block);
+ static_assert(segment_size > metadata_size + object_size, "segment is too small");
+ constexpr static unsigned objects_in_block = (segment_size - metadata_size) / object_size;
+
+ SegmentAllocator& _alloc;
+ boost::intrusive::list<block> _list;
+ std::function<future<>()> _reclaim_hook;
+public:
+ fixed_alloc(SegmentAllocator& alloc)
+ : _alloc(alloc)
+ , _reclaim_hook([] () -> future<> { throw std::bad_alloc(); })
+ { }
+ ~fixed_alloc() {
+ while (!_list.empty()) {
+ auto& blk = _list.front();
+ _list.pop_front();
+ blk.~block();
+ _alloc.free(&blk);
+ }
+ }
+ template<typename... Args>
+ future<ObjectType*> alloc(Args&&... args) {
+ if (_list.empty()) {
+ return alloc_block().then([this, args...] { return alloc(args...); });
+ }
+ auto& blk = _list.front();
+ auto ptr = blk.next;
+ blk.next = ptr->next;
+ blk.used++;
+ if (!blk.next) {
+ _list.erase(_list.iterator_to(blk));
+ }
+ assert(ptr);
+ auto obj = new (ptr) ObjectType(std::forward<Args>(args)...);
+ return make_ready_future<ObjectType*>(obj);
+ }
+ void free(ObjectType* ptr) {
+ ptr->~ObjectType();
+ auto& blk = block::from_object(ptr);
+ auto obj = reinterpret_cast<free_object*>(ptr);
+ if (!blk.next) {
+ _list.push_front(blk);
+ }
+ blk.used--;
+ obj->next = blk.next;
+ blk.next = obj;
+ if (blk.used == 0) {
+ _list.erase(_list.iterator_to(blk));
+ if (!_list.empty()) {
+ blk.~block();
+ _alloc.free(&blk);
+ } else {
+ _list.push_back(blk);
+ }
+ }
+ }
+ void set_reclaim_hook(std::function<future<>()> hook) {
+ _reclaim_hook = hook;
+ }
+private:
+ future<> alloc_block() {
+ return _alloc.when_ready([this] {
+ auto ptr = _alloc.alloc();
+ if (!ptr) {
+ return _reclaim_hook().then([this] { return alloc_block(); });
+ }
+ auto blk = new (ptr) block;
+ blk->used = 0;
+ auto obj = reinterpret_cast<free_object*>(blk + 1);
+ blk->next = obj;
+ for (unsigned i = 0; i < objects_in_block - 1; i++) {
+ obj->next = obj->successor();
+ obj = obj->next;
+ }
+ obj->next = nullptr;
+ _list.push_back(*blk);
+ return make_ready_future<>();
+ });
+ }

Paweł Dziepak

<pdziepak@quarnos.org>
unread,
May 7, 2015, 12:21:27 PM5/7/15
to seastar-dev@googlegroups.com, Paweł Dziepak
Signed-off-by: Paweł Dziepak <pdzi...@quarnos.org>
---
apps/flashcache/flashcache.cc | 862 +++++++++++++++++++++++++++++-------------
1 file changed, 598 insertions(+), 264 deletions(-)

diff --git a/apps/flashcache/flashcache.cc b/apps/flashcache/flashcache.cc
index 7afb29d..3facc6b 100644
--- a/apps/flashcache/flashcache.cc
+++ b/apps/flashcache/flashcache.cc
@@ -36,7 +36,7 @@
#include "core/distributed.hh"
#include "core/vector-data-sink.hh"
#include "core/bitops.hh"
-#include "core/slab.hh"
+#include "core/log_alloc.hh"
#include "core/align.hh"
#include "net/api.hh"
#include "net/packet-data-source.hh"
@@ -54,10 +54,8 @@ namespace bi = boost::intrusive;

namespace memcache {

-static constexpr double default_slab_growth_factor = 1.25;
-static constexpr uint64_t default_slab_page_size = 1UL*MB;
-static constexpr uint64_t default_per_cpu_slab_size = 0UL; // zero means reclaimer is enabled.
-static __thread slab_allocator<item>* slab;
+typedef segment_allocator<16 * 1024> segment_allocator_type;
+static __thread segment_allocator_type* segalloc;

template<typename T>
using optional = boost::optional<T>;
@@ -78,7 +76,7 @@ struct expiration {
}
}

- bool ever_expires() {
+ bool ever_expires() const {
return _time;
}

@@ -87,7 +85,7 @@ struct expiration {
}
};

-class item : public slab_item_base {
+class item {
public:
using version_type = uint64_t;
using time_point = clock_type::time_point;
@@ -99,41 +97,25 @@ private:
version_type _version;
hook_type _cache_link;
bi::list_member_hook<> _timer_link;
- size_t _key_hash;
+ bi::list_member_hook<> _lru_link;
expiration _expiry;
+ size_t _key_hash;
+ log_alloc_ptr _key;
uint32_t _value_size;
- uint32_t _slab_page_index;
uint16_t _ref_count;
- uint8_t _key_size;
uint8_t _ascii_prefix_size;
- char _data[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value.
+ log_alloc_ptr _value;
friend class cache;
public:
- item(uint32_t slab_page_index, item_key&& key, sstring&& ascii_prefix,
- sstring&& value, expiration expiry, version_type version = 1)
- : _version(version)
- , _key_hash(key.hash())
- , _expiry(expiry)
- , _value_size(value.size())
- , _slab_page_index(slab_page_index)
- , _ref_count(0U)
- , _key_size(key.key().size())
- , _ascii_prefix_size(ascii_prefix.size())
- {
- assert(_key_size <= std::numeric_limits<uint8_t>::max());
- assert(_ascii_prefix_size <= std::numeric_limits<uint8_t>::max());
- // storing key
- memcpy(_data, key.key().c_str(), _key_size);
- // storing ascii_prefix
- memcpy(_data + align_up(_key_size, field_alignment), ascii_prefix.c_str(), _ascii_prefix_size);
- // storing value
- memcpy(_data + align_up(_key_size, field_alignment) + align_up(_ascii_prefix_size, field_alignment),
- value.c_str(), _value_size);
- }
-
+ item() : _version(1), _ref_count(0U) { }
+ ~item();
item(const item&) = delete;
item(item&&) = delete;

+ void set_expiry(expiration expiry) {
+ _expiry = expiry;
+ }
+
clock_type::time_point get_timeout() {
return _expiry.to_time_point();
}
@@ -142,71 +124,48 @@ public:
return _version;
}

- const std::experimental::string_view key() const {
- return std::experimental::string_view(_data, _key_size);
+ bool erased() const {
+ return !_value;
}

- const std::experimental::string_view ascii_prefix() const {
- const char *p = _data + align_up(_key_size, field_alignment);
- return std::experimental::string_view(p, _ascii_prefix_size);
+ const log_alloc_ptr& key() const { return _key; }
+ log_alloc_ptr& reset_key() {
+ assert(!_key);
+ return _key;
}
+ void set_key_hash(size_t hash) { _key_hash = hash; }
+
+ const sstring ascii_prefix() const;

- const std::experimental::string_view value() const {
- const char *p = _data + align_up(_key_size, field_alignment) +
- align_up(_ascii_prefix_size, field_alignment);
- return std::experimental::string_view(p, _value_size);
+ const log_alloc_ptr& value() const { return _value; }
+ log_alloc_ptr& reset_value();
+ void set_value_size(size_t value_size, size_t prefix_size) {
+ _value_size = value_size;
+ _ascii_prefix_size = prefix_size;
}

size_t key_size() const {
- return _key_size;
+ return _key.size;
}

size_t ascii_prefix_size() const {
return _ascii_prefix_size;
}
-
+ size_t value_offset() const {
+ return _ascii_prefix_size;
+ }
size_t value_size() const {
return _value_size;
}

- optional<uint64_t> data_as_integral() {
- auto str = value().data();
- if (str[0] == '-') {
- return {};
- }
-
- auto len = _value_size;
-
- // Strip trailing space
- while (len && str[len - 1] == ' ') {
- len--;
- }
-
- try {
- return {boost::lexical_cast<uint64_t>(str, len)};
- } catch (const boost::bad_lexical_cast& e) {
- return {};
- }
- }
+ optional<uint64_t> data_as_integral();

// needed by timer_set
bool cancel() {
return false;
}

- // Methods required by slab allocator.
- uint32_t get_slab_page_index() const {
- return _slab_page_index;
- }
- bool is_unlocked() const {
- return _ref_count == 1;
- }
-
- friend bool operator==(const item &a, const item &b) {
- return (a._key_hash == b._key_hash) &&
- (a._key_size == b._key_size) &&
- (memcmp(a._data, b._data, a._key_size) == 0);
- }
+ friend bool operator==(const item &a, const item &b);

friend std::size_t hash_value(const item &i) {
return i._key_hash;
@@ -215,31 +174,163 @@ public:
friend inline void intrusive_ptr_add_ref(item* it) {
assert(it->_ref_count >= 0);
++it->_ref_count;
- if (it->_ref_count == 2) {
- slab->lock_item(it);
+ }
+
+ friend void intrusive_ptr_release(item* it);
+
+ friend class item_key_cmp;
+};
+
+typedef fixed_alloc<item, segment_allocator_type> fixed_allocator_type;
+static __thread fixed_allocator_type* fixalloc;
+
+void evict_item(item&);
+
+template<typename EvictionStep = std::ratio<1, 10>>
+class lru_evictor {
+public:
+ void add(item& it) {
+ intrusive_ptr_add_ref(&it);
+ _lru.push_front(it);
+ }
+ void remove(item& it) {
+ _lru.erase(_lru.iterator_to(it));
+ intrusive_ptr_release(&it);
+ }
+ void touch(item& it) {
+ _lru.erase(_lru.iterator_to(it));
+ _lru.push_front(it);
+ }
+ void evict(size_t needed, unsigned attempt) {
+ auto total = total_memory();
+ assert(total >= needed);
+ auto step = total * EvictionStep::num / EvictionStep::den * attempt;
+ auto target = std::max(step, needed);
+ while (!_lru.empty() && unused_memory() < target) {
+ auto& obj = _lru.back();
+ _lru.pop_back();
+ evict_item(obj);
+ intrusive_ptr_release(&obj);
+ }
+ }
+ void evict_all() {
+ while (!_lru.empty()) {
+ auto& obj = _lru.back();
+ _lru.pop_back();
+ evict_item(obj);
+ intrusive_ptr_release(&obj);
}
}
+private:
+ size_t total_memory() const;
+ size_t unused_memory() const;
+private:
+ bi::list<item,
+ bi::member_hook<item, bi::list_member_hook<>, &item::_lru_link>
+ > _lru;
+};

- friend inline void intrusive_ptr_release(item* it) {
- --it->_ref_count;
- if (it->_ref_count == 1) {
- slab->unlock_item(it);
- } else if (it->_ref_count == 0) {
- slab->free(it);
+typedef lru_evictor<> evictor_type;
+static __thread evictor_type* evictor;
+typedef log_alloc<segment_allocator_type, evictor_type> log_allocator_type;
+static __thread log_allocator_type* logalloc;
+
+template<typename EvictionStep>
+inline size_t lru_evictor<EvictionStep>::total_memory() const {
+ return segalloc->total() * segalloc->segment_size;
+}
+
+template<typename EvictionStep>
+inline size_t lru_evictor<EvictionStep>::unused_memory() const {
+ return segalloc->unused() * segalloc->segment_size + logalloc->unused_memory();
+}
+
+item::~item()
+{
+ if (_value) {
+ logalloc->free(_value);
+ }
+ if (_key) {
+ logalloc->free(_key);
+ }
+}
+
+log_alloc_ptr& item::reset_value()
+{
+ if (_value) {
+ logalloc->free(_value);
+ _version++;
+ } else {
+ _version = 0;
+ }
+ _value = log_alloc_ptr();
+ return _value;
+}
+
+const sstring item::ascii_prefix() const
+{
+ sstring str(sstring::initialized_later(), _ascii_prefix_size);
+ auto obj = log_allocator_type::object_view(_value);
+ auto it = str.begin();
+ size_t left = _ascii_prefix_size;
+ for (auto&& v : obj) {
+ auto size = std::min(left, v.iov_len);
+ it = std::copy(static_cast<char*>(v.iov_base),
+ static_cast<char*>(v.iov_base) + size, it);
+ left -= size;
+ if (!left) {
+ break;
}
- assert(it->_ref_count >= 0);
}
+ return str;
+}

- friend class item_key_cmp;
-};
+optional<uint64_t> item::data_as_integral()
+{
+ std::unique_ptr<char[]> temp(new char[_value_size]);
+ const char* str = "";
+ if (_value) {
+ auto obj = log_allocator_type::object_view(_value);
+ auto ret = obj.read(value_offset(), temp.get(), _value_size);
+ assert(ret == _value_size);
+ str = temp.get();
+ } else {
+ assert(!_value_size);
+ }
+
+ if (str[0] == '-') {
+ return {};
+ }
+
+ auto len = _value_size;
+
+ // Strip trailing space
+ while (len && str[len - 1] == ' ') {
+ len--;
+ }
+
+ try {
+ return {boost::lexical_cast<uint64_t>(str, len)};
+ } catch (const boost::bad_lexical_cast& e) {
+ return {};
+ }
+}
+
+bool operator==(const item &a, const item &b) {
+ return (a._key_hash == b._key_hash) &&
+ (a.key_size() == b.key_size()) &&
+ (!log_allocator_type::object_view::equal(a.key(), b.key()));
+}

struct item_key_cmp
{
private:
bool compare(const item_key& key, const item& it) const {
- return (it._key_hash == key.hash()) &&
- (it._key_size == key.key().size()) &&
- (memcmp(it._data, key.key().c_str(), it._key_size) == 0);
+ if (it._key_hash != key.hash() || it.key_size() != key.key().size()) {
+ return false;
+ }
+ auto view = log_allocator_type::object_view(it.key());
+ return view.equal(key.key().c_str(), key.key().size());
}
public:
bool operator()(const item_key& key, const item& it) const {
@@ -253,6 +344,70 @@ public:

using item_ptr = foreign_ptr<boost::intrusive_ptr<item>>;

+class item_data {
+public:
+ using element_type = item_data;
+ item_data() : _value_size(0), _ascii_prefix_size(0) { }
+ item_data(boost::intrusive_ptr<item> it) : _item(it),
+ _value_size(it->value_size()), _ascii_prefix_size(it->ascii_prefix_size()),
+ _value(it->value()) {
+ assert(_value);
+ logalloc->lock_object(_value);
+ }
+ item_data(const item_data&) = delete;
+ item_data(item_data&&) = default;
+ item_data& operator=(const item_data&) = delete;
+ item_data& operator=(item_data&&) = default;
+ ~item_data() {
+ if (_value) {
+ logalloc->unlock_object(_value);
+ }
+ }
+ operator bool() const { return static_cast<bool>(_item); }
+ item& parent() const { return *_item; }
+ log_allocator_type::object_view key() const {
+ return _item->key();
+ }
+ log_allocator_type::object_view value() const { return _value; }
+ size_t value_size() const { return _value_size; }
+ size_t ascii_prefix_size() const { return _ascii_prefix_size; }
+
+ template<typename Func>
+ void iterate_prefix(Func&& func) {
+ auto prefix_size = ascii_prefix_size();
+ for (auto&& vec : _value) {
+ auto size = std::min(prefix_size, vec.iov_len);
+ func(vec.iov_base, size);
+ prefix_size -= size;
+ if (!prefix_size) {
+ break;
+ }
+ }
+ }
+ template<typename Func>
+ void iterate_value(Func&& func) {
+ auto prefix_size = ascii_prefix_size();
+ for (auto&& vec : _value) {
+ if (prefix_size < vec.iov_len) {
+ auto base = static_cast<char*>(vec.iov_base) + prefix_size;
+ func(base, vec.iov_len - prefix_size);
+ prefix_size = 0;
+ } else {
+ prefix_size -= vec.iov_len;
+ }
+ }
+ }
+
+ item_data& operator*() const { return *const_cast<item_data*>(this); }
+private:
+ boost::intrusive_ptr<item> _item;
+ size_t _value_size;
+ size_t _ascii_prefix_size;
+ log_allocator_type::object_view _value;
+};
+
+using item_data_ptr = foreign_ptr<item_data>;
+
struct cache_stats {
size_t _get_hits {};
size_t _get_misses {};
@@ -324,6 +479,10 @@ struct item_insertion_data {
expiration expiry;
};

+class cache;
+
+__thread cache* cache_ptr;
+
class cache {
private:
using cache_type = bi::unordered_set<item,
@@ -340,49 +499,35 @@ private:
timer<> _timer;
cache_stats _stats;
timer<> _flush_timer;
+ std::unique_ptr<memory::reclaimer> _reclaimer;
private:
size_t item_size(item& item_ref) {
- constexpr size_t field_alignment = alignof(void*);
- return sizeof(item) +
- align_up(item_ref.key_size(), field_alignment) +
- align_up(item_ref.ascii_prefix_size(), field_alignment) +
- item_ref.value_size();
- }
-
- size_t item_size(item_insertion_data& insertion) {
- constexpr size_t field_alignment = alignof(void*);
- auto size = sizeof(item) +
- align_up(insertion.key.key().size(), field_alignment) +
- align_up(insertion.ascii_prefix.size(), field_alignment) +
- insertion.data.size();
-#ifdef __DEBUG__
- static bool print_item_footprint = true;
- if (print_item_footprint) {
- print_item_footprint = false;
- std::cout << __FUNCTION__ << ": " << size << "\n";
- std::cout << "sizeof(item) " << sizeof(item) << "\n";
- std::cout << "key.size " << insertion.key.key().size() << "\n";
- std::cout << "value.size " << insertion.data.size() << "\n";
- std::cout << "ascii_prefix.size " << insertion.ascii_prefix.size() << "\n";
- }
-#endif
- return size;
+ return sizeof(item) + item_ref.key_size() + item_ref.ascii_prefix_size()
+ + item_ref.value_size();
}

- template <bool IsInCache = true, bool IsInTimerList = true, bool Release = true>
+ enum class in_timer_list {
+ no,
+ yes,
+ };
+ enum class in_lru_list {
+ no,
+ yes,
+ };
+ template <in_timer_list timer = in_timer_list::yes, in_lru_list lru = in_lru_list::yes>
void erase(item& item_ref) {
- if (IsInCache) {
- _cache.erase(_cache.iterator_to(item_ref));
+ if (item_ref.erased()) {
+ return;
}
- if (IsInTimerList) {
+ if (timer == in_timer_list::yes) {
if (item_ref._expiry.ever_expires()) {
_alive.remove(item_ref);
}
}
+ item_ref.reset_value();
_stats._bytes -= item_size(item_ref);
- if (Release) {
- // memory used by item shouldn't be freed when slab is replacing it with another item.
- intrusive_ptr_release(&item_ref);
+ if (lru == in_lru_list::yes) {
+ evictor->remove(item_ref);
}
}

@@ -391,7 +536,7 @@ private:
while (!exp.empty()) {
auto item = &*exp.begin();
exp.pop_front();
- erase<true, false>(*item);
+ erase<in_timer_list::no>(*item);
_stats._expired++;
}
_timer.arm(_alive.get_next_timeout());
@@ -399,45 +544,11 @@ private:

inline
cache_iterator find(const item_key& key) {
- return _cache.find(key, std::hash<item_key>(), item_key_cmp());
- }
-
- template <typename Origin>
- inline
- cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) {
- auto& old_item = *i;
- uint64_t old_item_version = old_item._version;
-
- erase(old_item);
-
- size_t size = item_size(insertion);
- auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
- Origin::move_if_local(insertion.data), insertion.expiry, old_item_version + 1);
- intrusive_ptr_add_ref(new_item);
-
- auto insert_result = _cache.insert(*new_item);
- assert(insert_result.second);
- if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) {
- _timer.rearm(new_item->get_timeout());
- }
- _stats._bytes += size;
- return insert_result.first;
- }
-
- template <typename Origin>
- inline
- void add_new(item_insertion_data& insertion) {
- size_t size = item_size(insertion);
- auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
- Origin::move_if_local(insertion.data), insertion.expiry);
- intrusive_ptr_add_ref(new_item);
- auto& item_ref = *new_item;
- _cache.insert(item_ref);
- if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) {
- _timer.rearm(item_ref.get_timeout());
+ auto it = _cache.find(key, std::hash<item_key>(), item_key_cmp());
+ if (it != _cache.end() && !it->erased()) {
+ evictor->touch(*it);
}
- _stats._bytes += size;
- maybe_rehash();
+ return !it->erased() ? it : _cache.end();
}

void maybe_rehash() {
@@ -455,35 +566,146 @@ private:
_resize_up_threshold = _cache.bucket_count() * load_factor;
}
}
+
+ void set_item(item& item_ref, item_insertion_data&& insert, log_allocator_type::object_view&& data) {
+ data.write(0, insert.ascii_prefix.c_str(), insert.ascii_prefix.size());
+ data.write(insert.ascii_prefix.size(), insert.data.c_str(), insert.data.size());
+ item_ref.set_value_size(insert.data.size(), insert.ascii_prefix.size());
+
+ if (item_ref._expiry.ever_expires()) {
+ _alive.remove(item_ref);
+ }
+ if (insert.expiry.ever_expires() && _alive.insert(item_ref)) {
+ _timer.rearm(item_ref.get_timeout());
+ }
+ }
+
+ enum class can_overwrite {
+ no,
+ yes,
+ };
+ enum class can_create {
+ no,
+ yes,
+ };
+
+ // get a reference to item or create a new one in erased state
+ // erased state -- the item exists and is in the cache altough no
+ // value is associated with it, it is kind of intermediate state
+ // and it indicates that some operation on this item is in progres
+ // get requests ignore erased items
+ template<can_create create>
+ future<std::pair<bool, boost::intrusive_ptr<item>>> get_item(item_key&& key) {
+ auto it = _cache.find(key, std::hash<item_key>(), item_key_cmp());
+ if (it != _cache.end()) {
+ boost::intrusive_ptr<item> ptr(&*it);
+ return make_ready_future<std::pair<bool, boost::intrusive_ptr<item>>>(std::make_pair(!it->erased(), ptr));
+ }
+ if (create == can_create::no) {
+ return make_ready_future<std::pair<bool, boost::intrusive_ptr<item>>>(std::make_pair(false, nullptr));
+ }
+ return fixalloc->alloc().then([this, key = std::move(key)] (item* new_item) mutable {
+ auto key_size = key.key().size();
+ return logalloc->alloc(key_size, [new_item] {
+ return &new_item->reset_key();
+ }, [this, new_item, key = std::move(key)] (log_alloc_ptr* key_ptr) {
+ // check again, the item may have been created in the meantime
+ auto it = _cache.find(key, std::hash<item_key>(), item_key_cmp());
+ if (it != _cache.end()) {
+ fixalloc->free(new_item);
+ boost::intrusive_ptr<item> ptr(&*it);
+ return std::make_pair(!it->erased(), ptr);
+ }
+
+ log_allocator_type::object_view key_buffer(*key_ptr);
+ key_buffer.write(0, key.key().c_str(), key.key().size());
+ new_item->set_key_hash(key.hash());
+
+ boost::intrusive_ptr<item> ptr(new_item);
+ _cache.insert(*new_item);
+ maybe_rehash();
+ return std::make_pair(false, ptr);
+ });
+ });
+ }
+
+ // can_ovewrtie decides whether the addition can succeed if the item with a
+ // given key already exist. can_create decides whether the call to add()
+ // can create a new item if there isn't any with a specified key.
+ template<typename Origin, can_overwrite overwrite, can_create create>
+ future<std::pair<bool, bool>> add(item_insertion_data& insertion) {
+ static_assert(overwrite == can_overwrite::yes || create == can_create::yes, "impossible item add conditions");
+ auto key = Origin::move_if_local(insertion.key);
+ return get_item<create>(std::move(key)).then([this, insert = Origin::move_if_local(insertion)] (std::pair<bool, boost::intrusive_ptr<item>> ret) mutable {
+ if (!ret.second || (overwrite == can_overwrite::no && ret.first && !ret.second->erased())) {
+ return make_ready_future<std::pair<bool, bool>>(std::make_pair(false, false));
+ }
+ auto value_size = insert.data.size() + insert.ascii_prefix.size();
+ return logalloc->alloc(value_size, [this, ptr = ret.second] () -> log_alloc_ptr* {
+ if (ptr->erased()) {
+ if (create == can_create::no) {
+ return nullptr;
+ }
+ evictor->add(*ptr);
+ } else {
+ evictor->touch(*ptr);
+ _stats._bytes -= item_size(*ptr);
+ }
+ return &ptr->reset_value();
+ }, [this, ret, insert = std::move(insert)] (log_alloc_ptr* ptr) {
+ if (!ptr) {
+ return std::make_pair(false, false);
+ }
+ log_allocator_type::object_view value(*ptr);
+ value.write(0, insert.ascii_prefix.c_str(), insert.ascii_prefix.size());
+ value.write(insert.ascii_prefix.size(), insert.data.c_str(), insert.data.size());
+
+ auto& item_ref = *ret.second;
+ item_ref.set_value_size(insert.data.size(), insert.ascii_prefix.size());
+ item_ref.set_expiry(insert.expiry);
+ if (insert.expiry.ever_expires() && _alive.insert(item_ref)) {
+ _timer.rearm(item_ref.get_timeout());
+ }
+ _stats._bytes += item_size(item_ref);
+ return std::make_pair(true, ret.first);
+ });
+ });
+ }
public:
- cache(uint64_t per_cpu_slab_size, uint64_t slab_page_size)
+ cache()
: _buckets(new cache_type::bucket_type[initial_bucket_count])
, _cache(cache_type::bucket_traits(_buckets, initial_bucket_count))
{
_timer.set_callback([this] { expire(); });
_flush_timer.set_callback([this] { flush_all(); });

- // initialize per-thread slab allocator.
- slab = new slab_allocator<item>(default_slab_growth_factor, per_cpu_slab_size, slab_page_size,
- [this](item& item_ref) { erase<true, true, false>(item_ref); _stats._evicted++; });
-#ifdef __DEBUG__
- static bool print_slab_classes = true;
- if (print_slab_classes) {
- print_slab_classes = false;
- slab->print_slab_classes();
- }
-#endif
+ cache_ptr = this;
+ segalloc = new segment_allocator_type;
+ evictor = new evictor_type;
+ logalloc = new log_allocator_type(*segalloc, *evictor);
+ fixalloc = new fixed_allocator_type(*segalloc);
+ fixalloc->set_reclaim_hook([] { return logalloc->reclaim(1); });
+ _reclaimer.reset(new memory::reclaimer([this] {
+ logalloc->reclaim(1);
+ segalloc->free_memory();
+ }));
}

~cache() {
- flush_all();
+ flush_all();
+ delete evictor;
+ delete fixalloc;
+ delete logalloc;
+ delete segalloc;
+ }
+
+ void remove_from_hashtable(item& item_ref) {
+ _cache.erase(_cache.iterator_to(item_ref));
}

void flush_all() {
_flush_timer.cancel();
- _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item* it) {
- erase<false, true>(*it);
- });
+ evictor->evict_all();
}

void flush_at(clock_type::time_point time_point) {
@@ -491,43 +713,36 @@ public:
}

template <typename Origin = local_origin_tag>
- bool set(item_insertion_data& insertion) {
- auto i = find(insertion.key);
- if (i != _cache.end()) {
- add_overriding<Origin>(i, insertion);
- _stats._set_replaces++;
- return true;
- } else {
- add_new<Origin>(insertion);
- _stats._set_adds++;
- return false;
- }
+ future<bool> set(item_insertion_data& insertion) {
+ return add<Origin, can_overwrite::yes, can_create::yes>(insertion).then([this] (auto ret) {
+ if (ret.second) {
+ _stats._set_replaces++;
+ } else {
+ _stats._set_adds++;
+ }
+ return make_ready_future<bool>(ret.second);
+ });
}

template <typename Origin = local_origin_tag>
- bool add(item_insertion_data& insertion) {
- auto i = find(insertion.key);
- if (i != _cache.end()) {
- return false;
- }
-
- _stats._set_adds++;
- add_new<Origin>(insertion);
- return true;
+ future<bool> add(item_insertion_data& insertion) {
+ return add<Origin, can_overwrite::no, can_create::yes>(insertion).then([this] (auto ret) {
+ if (ret.first) {
+ _stats._set_adds++;
+ }
+ return make_ready_future<bool>(ret.first);
+ });
}

template <typename Origin = local_origin_tag>
- bool replace(item_insertion_data& insertion) {
- auto i = find(insertion.key);
- if (i == _cache.end()) {
- return false;
- }
-
- _stats._set_replaces++;
- add_overriding<Origin>(i, insertion);
- return true;
+ future<bool> replace(item_insertion_data& insertion) {
+ return add<Origin, can_overwrite::yes, can_create::no>(insertion).then([this] (auto ret) {
+ if (ret.first) {
+ _stats._set_replaces++;
+ }
+ return make_ready_future<bool>(ret.first);
+ });
}
-
bool remove(const item_key& key) {
auto i = find(key);
if (i == _cache.end()) {
@@ -540,32 +755,58 @@ public:
return true;
}

- item_ptr get(const item_key& key) {
+ void evict(item& item_ref) {
+ erase<in_timer_list::yes, in_lru_list::no>(item_ref);
+ _stats._evicted++;
+ }
+
+ future<item_data_ptr> get(const item_key& key) {
auto i = find(key);
if (i == _cache.end()) {
_stats._get_misses++;
- return nullptr;
+ return make_ready_future<item_data_ptr>();
}
_stats._get_hits++;
auto& item_ref = *i;
- return item_ptr(&item_ref);
+ return make_ready_future<item_data_ptr>(item_data(&item_ref));
}

template <typename Origin = local_origin_tag>
- cas_result cas(item_insertion_data& insertion, item::version_type version) {
+ future<cas_result> cas(item_insertion_data& insertion, item::version_type version) {
auto i = find(insertion.key);
if (i == _cache.end()) {
_stats._cas_misses++;
- return cas_result::not_found;
+ return make_ready_future<cas_result>(cas_result::not_found);
}
auto& item_ref = *i;
if (item_ref._version != version) {
_stats._cas_badval++;
- return cas_result::bad_version;
+ return make_ready_future<cas_result>(cas_result::bad_version);
}
- _stats._cas_hits++;
- add_overriding<Origin>(i, insertion);
- return cas_result::stored;
+ boost::intrusive_ptr<item> ptr = &item_ref;
+ auto insert = Origin::move_if_local(insertion);
+ auto data_size = insert.data.size() + insert.ascii_prefix.size();
+ return logalloc->alloc(data_size, [this, ptr, version] () -> log_alloc_ptr* {
+ if (ptr->erased() || ptr->_version != version) {
+ return nullptr;
+ }
+ _stats._bytes -= item_size(*ptr);
+ return &ptr->reset_value();
+ }, [this, ptr, version, insert = std::move(insert)] (log_alloc_ptr* data) mutable {
+ if (!data) {
+ if (ptr->_version - 1 != version) {
+ _stats._cas_badval++;
+ return make_ready_future<cas_result>(cas_result::bad_version);
+ } else {
+ _stats._cas_misses++;
+ return make_ready_future<cas_result>(cas_result::not_found);
+ }
+ }
+ set_item(*ptr, std::move(insert), log_allocator_type::object_view(*data));
+ _stats._cas_hits++;
+ _stats._bytes += item_size(*ptr);
+ return make_ready_future<cas_result>(cas_result::stored);
+ });
}

size_t size() {
@@ -582,49 +823,123 @@ public:
}

template <typename Origin = local_origin_tag>
- std::pair<item_ptr, bool> incr(item_key& key, uint64_t delta) {
+ future<std::pair<item_data_ptr, bool>> incr(item_key& key, uint64_t delta) {
auto i = find(key);
if (i == _cache.end()) {
_stats._incr_misses++;
- return {item_ptr{}, false};
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
auto& item_ref = *i;
- _stats._incr_hits++;
auto value = item_ref.data_as_integral();
if (!value) {
- return {boost::intrusive_ptr<item>(&item_ref), false};
+ _stats._incr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(&item_ref), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
item_insertion_data insertion {
.key = Origin::move_if_local(key),
- .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
+ .ascii_prefix = item_ref.ascii_prefix(),
.data = to_sstring(*value + delta),
.expiry = item_ref._expiry
};
- i = add_overriding<local_origin_tag>(i, insertion);
- return {boost::intrusive_ptr<item>(&*i), true};
+ boost::intrusive_ptr<item> ptr = &item_ref;
+ auto data_size = insertion.data.size() + insertion.ascii_prefix.size();
+ return logalloc->alloc(data_size, [this, ptr, old_value = *value] {
+ if (ptr->erased()) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr,
+ optional<uint64_t>());
+ }
+ auto value = ptr->data_as_integral();
+ if (!value || *value != old_value) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr, value);
+ }
+ _stats._bytes -= item_size(*ptr);
+ return std::make_pair(&ptr->reset_value(), value);
+ }, [this, ptr, delta, insert = std::move(insertion),
+ old_value = *value] (std::pair<log_alloc_ptr*, optional<uint64_t>> ret) mutable {
+ if (ptr->erased()) {
+ _stats._incr_misses++;
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (!ret.second) {
+ _stats._incr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (*ret.second != old_value) {
+ return incr(insert.key, delta);
+ }
+ assert(ret.first);
+ set_item(*ptr, std::move(insert), log_allocator_type::object_view(*ret.first));
+ _stats._incr_hits++;
+ _stats._bytes += item_size(*ptr);
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), true};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ });
}

template <typename Origin = local_origin_tag>
- std::pair<item_ptr, bool> decr(item_key& key, uint64_t delta) {
+ future<std::pair<item_data_ptr, bool>> decr(item_key& key, uint64_t delta) {
auto i = find(key);
if (i == _cache.end()) {
_stats._decr_misses++;
- return {item_ptr{}, false};
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
auto& item_ref = *i;
- _stats._decr_hits++;
auto value = item_ref.data_as_integral();
if (!value) {
- return {boost::intrusive_ptr<item>(&item_ref), false};
+ _stats._decr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(&item_ref), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
item_insertion_data insertion {
.key = Origin::move_if_local(key),
- .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
+ .ascii_prefix = item_ref.ascii_prefix(),
.data = to_sstring(*value - std::min(*value, delta)),
.expiry = item_ref._expiry
};
- i = add_overriding<local_origin_tag>(i, insertion);
- return {boost::intrusive_ptr<item>(&*i), true};
+ boost::intrusive_ptr<item> ptr = &item_ref;
+ auto data_size = insertion.data.size() + insertion.ascii_prefix.size();
+ return logalloc->alloc(data_size, [this, ptr, old_value = *value] {
+ if (ptr->erased()) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr,
+ optional<uint64_t>());
+ }
+ auto value = ptr->data_as_integral();
+ if (!value || *value != old_value) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr, value);
+ }
+ _stats._bytes -= item_size(*ptr);
+ return std::make_pair(&ptr->reset_value(), value);
+ }, [this, ptr, delta, insert = std::move(insertion),
+ old_value = *value] (std::pair<log_alloc_ptr*, optional<uint64_t>> ret) mutable {
+ if (ptr->erased()) {
+ _stats._decr_misses++;
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (!ret.second) {
+ _stats._decr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (*ret.second != old_value) {
+ return decr(insert.key, delta);
+ }
+ assert(ret.first);
+ set_item(*ptr, std::move(insert), log_allocator_type::object_view(*ret.first));
+ _stats._decr_hits++;
+ _stats._bytes += item_size(*ptr);
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), true};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ });
}

std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> print_hash_stats() {
@@ -671,6 +986,20 @@ public:
future<> stop() { return make_ready_future<>(); }
};

+inline void intrusive_ptr_release(item* it) {
+ --it->_ref_count;
+ assert(it->_ref_count >= 0);
+ if (it->_ref_count == 0) {
+ cache_ptr->remove_from_hashtable(*it);
+ fixalloc->free(it);
+ }
+}
+
+void evict_item(item& item_ref)
+{
+ cache_ptr->evict(item_ref);
+}
+
class sharded_cache {
private:
distributed<cache>& _peers;
@@ -694,7 +1023,7 @@ public:
future<bool> set(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<bool>(_peers.local().set(insertion));
+ return _peers.local().set(insertion);
}
return _peers.invoke_on(cpu, &cache::set<remote_origin_tag>, std::ref(insertion));
}
@@ -703,7 +1032,7 @@ public:
future<bool> add(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<bool>(_peers.local().add(insertion));
+ return _peers.local().add(insertion);
}
return _peers.invoke_on(cpu, &cache::add<remote_origin_tag>, std::ref(insertion));
}
@@ -712,7 +1041,7 @@ public:
future<bool> replace(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<bool>(_peers.local().replace(insertion));
+ return _peers.local().replace(insertion);
}
return _peers.invoke_on(cpu, &cache::replace<remote_origin_tag>, std::ref(insertion));
}
@@ -724,7 +1053,7 @@ public:
}

// The caller must keep @key live until the resulting future resolves.
- future<item_ptr> get(const item_key& key) {
+ future<item_data_ptr> get(const item_key& key) {
auto cpu = get_cpu(key);
return _peers.invoke_on(cpu, &cache::get, std::ref(key));
}
@@ -733,7 +1062,7 @@ public:
future<cas_result> cas(item_insertion_data& insertion, item::version_type version) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<cas_result>(_peers.local().cas(insertion, version));
+ return _peers.local().cas(insertion, version);
}
return _peers.invoke_on(cpu, &cache::cas<remote_origin_tag>, std::ref(insertion), std::move(version));
}
@@ -743,21 +1072,19 @@ public:
}

// The caller must keep @key live until the resulting future resolves.
- future<std::pair<item_ptr, bool>> incr(item_key& key, uint64_t delta) {
+ auto incr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<std::pair<item_ptr, bool>>(
- _peers.local().incr<local_origin_tag>(key, delta));
+ return _peers.local().incr<local_origin_tag>(key, delta);
}
return _peers.invoke_on(cpu, &cache::incr<remote_origin_tag>, std::ref(key), std::move(delta));
}

// The caller must keep @key live until the resulting future resolves.
- future<std::pair<item_ptr, bool>> decr(item_key& key, uint64_t delta) {
+ auto decr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<std::pair<item_ptr, bool>>(
- _peers.local().decr(key, delta));
+ return _peers.local().decr<local_origin_tag>(key, delta);
}
return _peers.invoke_on(cpu, &cache::decr<remote_origin_tag>, std::ref(key), std::move(delta));
}
@@ -808,7 +1135,7 @@ private:
memcache_ascii_parser _parser;
item_key _item_key;
item_insertion_data _insertion;
- std::vector<item_ptr> _items;
+ std::vector<item_data_ptr> _items;
private:
static constexpr const char *msg_crlf = "\r\n";
static constexpr const char *msg_error = "ERROR\r\n";
@@ -826,24 +1153,29 @@ private:
static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n";
private:
template <bool WithVersion>
- static void append_item(scattered_message<char>& msg, item_ptr item) {
- if (!item) {
+ static void append_item(scattered_message<char>& msg, item_data_ptr idata) {
+ if (!idata) {
return;
}
-
msg.append_static("VALUE ");
- msg.append_static(item->key());
- msg.append_static(item->ascii_prefix());
+ for (auto&& vec : idata->key()) {
+ msg.append_static(static_cast<char*>(vec.iov_base), vec.iov_len);
+ }
+ idata->iterate_prefix([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
+ });

if (WithVersion) {
msg.append_static(" ");
- msg.append(to_sstring(item->version()));
+ msg.append(to_sstring(idata->parent().version()));
}

msg.append_static(msg_crlf);
- msg.append_static(item->value());
+ idata->iterate_value([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
+ });
msg.append_static(msg_crlf);
- msg.on_delete([item = std::move(item)] {});
+ msg.on_delete([idata = std::move(idata)] {});
}

template <bool WithVersion>
@@ -1106,17 +1438,21 @@ public:
return std::move(f).discard_result();
}
return std::move(f).then([&out] (auto result) {
- auto item = std::move(result.first);
- if (!item) {
+ auto idata = std::move(result.first);
+ if (!idata) {
return out.write(msg_not_found);
}
auto incremented = result.second;
if (!incremented) {
return out.write(msg_error_non_numeric_value);
}
- return out.write(item->value().data(), item->value_size()).then([&out] {
- return out.write(msg_crlf);
+ scattered_message<char> msg;
+ idata->iterate_value([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
});
+ msg.append_static(msg_crlf);
+ msg.on_delete([idata = std::move(idata)] { });
+ return out.write(std::move(msg));
});
}

@@ -1127,17 +1463,21 @@ public:
return std::move(f).discard_result();
}
return std::move(f).then([&out] (auto result) {
- auto item = std::move(result.first);
- if (!item) {
+ auto idata = std::move(result.first);
+ if (!idata) {
return out.write(msg_not_found);
}
auto decremented = result.second;
if (!decremented) {
return out.write(msg_error_non_numeric_value);
}
- return out.write(item->value().data(), item->value_size()).then([&out] {
- return out.write(msg_crlf);
+ scattered_message<char> msg;
+ idata->iterate_value([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
});
+ msg.append_static(msg_crlf);
+ msg.on_delete([idata = std::move(idata)] { });
+ return out.write(std::move(msg));
});
}
};
@@ -1356,10 +1696,6 @@ int main(int ac, char** av) {
app.add_options()
("max-datagram-size", bpo::value<int>()->default_value(memcache::udp_server::default_max_datagram_size),
"Maximum size of UDP datagram")
- ("max-slab-size", bpo::value<uint64_t>()->default_value(memcache::default_per_cpu_slab_size/MB),
- "Maximum memory to be used for items (value in megabytes) (reclaimer is disabled if set)")
- ("slab-page-size", bpo::value<uint64_t>()->default_value(memcache::default_slab_page_size/MB),
- "Size of slab page (value in megabytes)")
("stats",
"Print basic statistics periodically (every second)")
("port", bpo::value<uint16_t>()->default_value(11211),
@@ -1374,9 +1710,7 @@ int main(int ac, char** av) {

auto&& config = app.configuration();
uint16_t port = config["port"].as<uint16_t>();
- uint64_t per_cpu_slab_size = config["max-slab-size"].as<uint64_t>() * MB;
- uint64_t slab_page_size = config["slab-page-size"].as<uint64_t>() * MB;
- return cache_peers.start(std::move(per_cpu_slab_size), std::move(slab_page_size)).then([&system_stats] {
+ return cache_peers.start().then([&system_stats] {
return system_stats.start(clock_type::now());
}).then([&] {
std::cout << PLATFORM << " memcached " << VERSION << "\n";
--
2.1.4

Dor Laor

<dor@cloudius-systems.com>
unread,
May 7, 2015, 5:13:59 PM5/7/15
to Paweł Dziepak, seastar-dev
On Thu, May 7, 2015 at 7:21 PM, Paweł Dziepak <pdzi...@quarnos.org> wrote:
Hello,
This is the fourth version of the flashcache patchset, hopefully the last one
without the support of on disk storage. Most of the changes in v4 were made to
improve performance. This includes changing the way per-object metadata is
stored. In the previous versions object data was directly preceeded by an
instance of log_alloc::metadata class which kept all the information needed
by the log-structured allocator. Now, this class is gone. Reference counting is
done on per segment basis and the rest of the metadata is stored directly inside
the owner object using log_alloc_ptr objects as hooks. In order to allow
compactor to iterate through objects in a block each segments contains an
intrusive list of log_alloc_ptr instances representing the objects which first
byte is in that segments.

Below are summaries of the performance tests I did. Like before the tests were
run on two c4.8xlarge EC2 instances, but this time native network stack and DPDK

Great
There seems to be a lot of misses - 227k/s * 60 is the overall TPS. Out of it
50% are set. So you end with 6M misses, that's too much. Please correct me if I'm wrong.
It may be less relevant for this allocator test but in general, a cache app should be tested
with a more realistic scenario. 
 

two consecutive (without server restart) runs, both with get:set ratio 1:1 and
key size 64 but the value size in the first on was 128 and in the second 1024
object memory limited to 128MB, segment size 16kB, eviction step 10%
single threaded client
             memcached 128   memcached 1024   flashcache 128   flashcache 1024
Net rate:          44.0M/s         129.5M/s          41.0M/s          156.0M/s

Is this the single and main benefit of this implementation - 2nd cycle of different size?
Will performance of memcache degrade more in time? What if the keys are mixed
over time and not run in 2 separate cycles? Don't redo tests unless you think
my questions make sense here.

Thanks, Dor


--
2.1.4

--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
To post to this group, send email to seast...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/1431015674-21847-1-git-send-email-pdziepak%40quarnos.org.
For more options, visit https://groups.google.com/d/optout.

Raphael S. Carvalho

<raphaelsc@cloudius-systems.com>
unread,
May 7, 2015, 6:18:14 PM5/7/15
to Dor Laor, Paweł Dziepak, seastar-dev
Memcache performance would terribly degrade when cache is full, and all of a sudden, the size of objects from set requests change dramatically. Slab allocator is currently very bad at handling such changing workloads. Why? As we know, because it currently cannot reassign slab pages across other classes of slab. It turns out that the slab allocator feature slab page reassignment is very difficult to implement efficiently.



--
Raphael S. Carvalho

Paweł Dziepak

<pdziepak@quarnos.org>
unread,
May 8, 2015, 6:25:50 AM5/8/15
to Dor Laor, seastar-dev
I agree that 1:1: set:get ratio may not be very realistic, but such tests were useful to see the performance penalty in write heavy scenarios. Most of the additional complexity of the log-structured allocator is in the allocation and compaction paths so I think it is good to see how much the performance was degraded. Without workloads with large amount of eviction and compaction I wouldn't know how bad for the overall performance they can be.
 
 

two consecutive (without server restart) runs, both with get:set ratio 1:1 and
key size 64 but the value size in the first on was 128 and in the second 1024
object memory limited to 128MB, segment size 16kB, eviction step 10%
single threaded client
             memcached 128   memcached 1024   flashcache 128   flashcache 1024
Net rate:          44.0M/s         129.5M/s          41.0M/s          156.0M/s

Is this the single and main benefit of this implementation - 2nd cycle of different size?
Will performance of memcache degrade more in time? What if the keys are mixed
over time and not run in 2 separate cycles? Don't redo tests unless you think
my questions make sense here.

I actually considered doing some tests with varying object size in a single run myself (actually the very first results I sent were done with random object size, but they were with posix stack and flashcache has change a lot since then). I wouldn't expect much change in flashcache behavior though -- it keeps the key and the value in separate objects so even the fixed item size tests are in fact "2 different sizes" tests for flashcache. After all, all that increased complexity (relative to slab based memcached) was added mainly to deal with different object sizes well. The fixed item size tests are basically the best case for slab based memcached.
Reply all
Reply to author
Forward
0 new messages