apps/flashcache/flashcache.cc | 862 +++++++++++++++++++++++++++++-------------
1 file changed, 598 insertions(+), 264 deletions(-)
diff --git a/apps/flashcache/flashcache.cc b/apps/flashcache/flashcache.cc
index 7afb29d..3facc6b 100644
--- a/apps/flashcache/flashcache.cc
+++ b/apps/flashcache/flashcache.cc
@@ -36,7 +36,7 @@
#include "core/distributed.hh"
#include "core/vector-data-sink.hh"
#include "core/bitops.hh"
-#include "core/slab.hh"
+#include "core/log_alloc.hh"
#include "core/align.hh"
#include "net/api.hh"
#include "net/packet-data-source.hh"
@@ -54,10 +54,8 @@ namespace bi = boost::intrusive;
namespace memcache {
-static constexpr double default_slab_growth_factor = 1.25;
-static constexpr uint64_t default_slab_page_size = 1UL*MB;
-static constexpr uint64_t default_per_cpu_slab_size = 0UL; // zero means reclaimer is enabled.
-static __thread slab_allocator<item>* slab;
+typedef segment_allocator<16 * 1024> segment_allocator_type;
+static __thread segment_allocator_type* segalloc;
template<typename T>
using optional = boost::optional<T>;
@@ -78,7 +76,7 @@ struct expiration {
}
}
- bool ever_expires() {
+ bool ever_expires() const {
return _time;
}
@@ -87,7 +85,7 @@ struct expiration {
}
};
-class item : public slab_item_base {
+class item {
public:
using version_type = uint64_t;
using time_point = clock_type::time_point;
@@ -99,41 +97,25 @@ private:
version_type _version;
hook_type _cache_link;
bi::list_member_hook<> _timer_link;
- size_t _key_hash;
+ bi::list_member_hook<> _lru_link;
expiration _expiry;
+ size_t _key_hash;
+ log_alloc_ptr _key;
uint32_t _value_size;
- uint32_t _slab_page_index;
uint16_t _ref_count;
- uint8_t _key_size;
uint8_t _ascii_prefix_size;
- char _data[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value.
+ log_alloc_ptr _value;
friend class cache;
public:
- item(uint32_t slab_page_index, item_key&& key, sstring&& ascii_prefix,
- sstring&& value, expiration expiry, version_type version = 1)
- : _version(version)
- , _key_hash(key.hash())
- , _expiry(expiry)
- , _value_size(value.size())
- , _slab_page_index(slab_page_index)
- , _ref_count(0U)
- , _key_size(key.key().size())
- , _ascii_prefix_size(ascii_prefix.size())
- {
- assert(_key_size <= std::numeric_limits<uint8_t>::max());
- assert(_ascii_prefix_size <= std::numeric_limits<uint8_t>::max());
- // storing key
- memcpy(_data, key.key().c_str(), _key_size);
- // storing ascii_prefix
- memcpy(_data + align_up(_key_size, field_alignment), ascii_prefix.c_str(), _ascii_prefix_size);
- // storing value
- memcpy(_data + align_up(_key_size, field_alignment) + align_up(_ascii_prefix_size, field_alignment),
- value.c_str(), _value_size);
- }
-
+ item() : _version(1), _ref_count(0U) { }
+ ~item();
item(const item&) = delete;
item(item&&) = delete;
+ void set_expiry(expiration expiry) {
+ _expiry = expiry;
+ }
+
clock_type::time_point get_timeout() {
return _expiry.to_time_point();
}
@@ -142,71 +124,48 @@ public:
return _version;
}
- const std::experimental::string_view key() const {
- return std::experimental::string_view(_data, _key_size);
+ bool erased() const {
+ return !_value;
}
- const std::experimental::string_view ascii_prefix() const {
- const char *p = _data + align_up(_key_size, field_alignment);
- return std::experimental::string_view(p, _ascii_prefix_size);
+ const log_alloc_ptr& key() const { return _key; }
+ log_alloc_ptr& reset_key() {
+ assert(!_key);
+ return _key;
}
+ void set_key_hash(size_t hash) { _key_hash = hash; }
+
+ const sstring ascii_prefix() const;
- const std::experimental::string_view value() const {
- const char *p = _data + align_up(_key_size, field_alignment) +
- align_up(_ascii_prefix_size, field_alignment);
- return std::experimental::string_view(p, _value_size);
+ const log_alloc_ptr& value() const { return _value; }
+ log_alloc_ptr& reset_value();
+ void set_value_size(size_t value_size, size_t prefix_size) {
+ _value_size = value_size;
+ _ascii_prefix_size = prefix_size;
}
size_t key_size() const {
- return _key_size;
+ return _key.size;
}
size_t ascii_prefix_size() const {
return _ascii_prefix_size;
}
-
+ size_t value_offset() const {
+ return _ascii_prefix_size;
+ }
size_t value_size() const {
return _value_size;
}
- optional<uint64_t> data_as_integral() {
- auto str = value().data();
- if (str[0] == '-') {
- return {};
- }
-
- auto len = _value_size;
-
- // Strip trailing space
- while (len && str[len - 1] == ' ') {
- len--;
- }
-
- try {
- return {boost::lexical_cast<uint64_t>(str, len)};
- } catch (const boost::bad_lexical_cast& e) {
- return {};
- }
- }
+ optional<uint64_t> data_as_integral();
// needed by timer_set
bool cancel() {
return false;
}
- // Methods required by slab allocator.
- uint32_t get_slab_page_index() const {
- return _slab_page_index;
- }
- bool is_unlocked() const {
- return _ref_count == 1;
- }
-
- friend bool operator==(const item &a, const item &b) {
- return (a._key_hash == b._key_hash) &&
- (a._key_size == b._key_size) &&
- (memcmp(a._data, b._data, a._key_size) == 0);
- }
+ friend bool operator==(const item &a, const item &b);
friend std::size_t hash_value(const item &i) {
return i._key_hash;
@@ -215,31 +174,163 @@ public:
friend inline void intrusive_ptr_add_ref(item* it) {
assert(it->_ref_count >= 0);
++it->_ref_count;
- if (it->_ref_count == 2) {
- slab->lock_item(it);
+ }
+
+ friend void intrusive_ptr_release(item* it);
+
+ friend class item_key_cmp;
+};
+
+typedef fixed_alloc<item, segment_allocator_type> fixed_allocator_type;
+static __thread fixed_allocator_type* fixalloc;
+
+void evict_item(item&);
+
+template<typename EvictionStep = std::ratio<1, 10>>
+class lru_evictor {
+public:
+ void add(item& it) {
+ intrusive_ptr_add_ref(&it);
+ _lru.push_front(it);
+ }
+ void remove(item& it) {
+ _lru.erase(_lru.iterator_to(it));
+ intrusive_ptr_release(&it);
+ }
+ void touch(item& it) {
+ _lru.erase(_lru.iterator_to(it));
+ _lru.push_front(it);
+ }
+ void evict(size_t needed, unsigned attempt) {
+ auto total = total_memory();
+ assert(total >= needed);
+ auto step = total * EvictionStep::num / EvictionStep::den * attempt;
+ auto target = std::max(step, needed);
+ while (!_lru.empty() && unused_memory() < target) {
+ auto& obj = _lru.back();
+ _lru.pop_back();
+ evict_item(obj);
+ intrusive_ptr_release(&obj);
+ }
+ }
+ void evict_all() {
+ while (!_lru.empty()) {
+ auto& obj = _lru.back();
+ _lru.pop_back();
+ evict_item(obj);
+ intrusive_ptr_release(&obj);
}
}
+private:
+ size_t total_memory() const;
+ size_t unused_memory() const;
+private:
+ bi::list<item,
+ bi::member_hook<item, bi::list_member_hook<>, &item::_lru_link>
+ > _lru;
+};
- friend inline void intrusive_ptr_release(item* it) {
- --it->_ref_count;
- if (it->_ref_count == 1) {
- slab->unlock_item(it);
- } else if (it->_ref_count == 0) {
- slab->free(it);
+typedef lru_evictor<> evictor_type;
+static __thread evictor_type* evictor;
+typedef log_alloc<segment_allocator_type, evictor_type> log_allocator_type;
+static __thread log_allocator_type* logalloc;
+
+template<typename EvictionStep>
+inline size_t lru_evictor<EvictionStep>::total_memory() const {
+ return segalloc->total() * segalloc->segment_size;
+}
+
+template<typename EvictionStep>
+inline size_t lru_evictor<EvictionStep>::unused_memory() const {
+ return segalloc->unused() * segalloc->segment_size + logalloc->unused_memory();
+}
+
+item::~item()
+{
+ if (_value) {
+ logalloc->free(_value);
+ }
+ if (_key) {
+ logalloc->free(_key);
+ }
+}
+
+log_alloc_ptr& item::reset_value()
+{
+ if (_value) {
+ logalloc->free(_value);
+ _version++;
+ } else {
+ _version = 0;
+ }
+ _value = log_alloc_ptr();
+ return _value;
+}
+
+const sstring item::ascii_prefix() const
+{
+ sstring str(sstring::initialized_later(), _ascii_prefix_size);
+ auto obj = log_allocator_type::object_view(_value);
+ auto it = str.begin();
+ size_t left = _ascii_prefix_size;
+ for (auto&& v : obj) {
+ auto size = std::min(left, v.iov_len);
+ it = std::copy(static_cast<char*>(v.iov_base),
+ static_cast<char*>(v.iov_base) + size, it);
+ left -= size;
+ if (!left) {
+ break;
}
- assert(it->_ref_count >= 0);
}
+ return str;
+}
- friend class item_key_cmp;
-};
+optional<uint64_t> item::data_as_integral()
+{
+ std::unique_ptr<char[]> temp(new char[_value_size]);
+ const char* str = "";
+ if (_value) {
+ auto obj = log_allocator_type::object_view(_value);
+ auto ret = obj.read(value_offset(), temp.get(), _value_size);
+ assert(ret == _value_size);
+ str = temp.get();
+ } else {
+ assert(!_value_size);
+ }
+
+ if (str[0] == '-') {
+ return {};
+ }
+
+ auto len = _value_size;
+
+ // Strip trailing space
+ while (len && str[len - 1] == ' ') {
+ len--;
+ }
+
+ try {
+ return {boost::lexical_cast<uint64_t>(str, len)};
+ } catch (const boost::bad_lexical_cast& e) {
+ return {};
+ }
+}
+
+bool operator==(const item &a, const item &b) {
+ return (a._key_hash == b._key_hash) &&
+ (a.key_size() == b.key_size()) &&
+ (!log_allocator_type::object_view::equal(a.key(), b.key()));
+}
struct item_key_cmp
{
private:
bool compare(const item_key& key, const item& it) const {
- return (it._key_hash == key.hash()) &&
- (it._key_size == key.key().size()) &&
- (memcmp(it._data, key.key().c_str(), it._key_size) == 0);
+ if (it._key_hash != key.hash() || it.key_size() != key.key().size()) {
+ return false;
+ }
+ auto view = log_allocator_type::object_view(it.key());
+ return view.equal(key.key().c_str(), key.key().size());
}
public:
bool operator()(const item_key& key, const item& it) const {
@@ -253,6 +344,70 @@ public:
using item_ptr = foreign_ptr<boost::intrusive_ptr<item>>;
+class item_data {
+public:
+ using element_type = item_data;
+ item_data() : _value_size(0), _ascii_prefix_size(0) { }
+ item_data(boost::intrusive_ptr<item> it) : _item(it),
+ _value_size(it->value_size()), _ascii_prefix_size(it->ascii_prefix_size()),
+ _value(it->value()) {
+ assert(_value);
+ logalloc->lock_object(_value);
+ }
+ item_data(const item_data&) = delete;
+ item_data(item_data&&) = default;
+ item_data& operator=(const item_data&) = delete;
+ item_data& operator=(item_data&&) = default;
+ ~item_data() {
+ if (_value) {
+ logalloc->unlock_object(_value);
+ }
+ }
+ operator bool() const { return static_cast<bool>(_item); }
+ item& parent() const { return *_item; }
+ log_allocator_type::object_view key() const {
+ return _item->key();
+ }
+ log_allocator_type::object_view value() const { return _value; }
+ size_t value_size() const { return _value_size; }
+ size_t ascii_prefix_size() const { return _ascii_prefix_size; }
+
+ template<typename Func>
+ void iterate_prefix(Func&& func) {
+ auto prefix_size = ascii_prefix_size();
+ for (auto&& vec : _value) {
+ auto size = std::min(prefix_size, vec.iov_len);
+ func(vec.iov_base, size);
+ prefix_size -= size;
+ if (!prefix_size) {
+ break;
+ }
+ }
+ }
+ template<typename Func>
+ void iterate_value(Func&& func) {
+ auto prefix_size = ascii_prefix_size();
+ for (auto&& vec : _value) {
+ if (prefix_size < vec.iov_len) {
+ auto base = static_cast<char*>(vec.iov_base) + prefix_size;
+ func(base, vec.iov_len - prefix_size);
+ prefix_size = 0;
+ } else {
+ prefix_size -= vec.iov_len;
+ }
+ }
+ }
+
+ item_data& operator*() const { return *const_cast<item_data*>(this); }
+private:
+ boost::intrusive_ptr<item> _item;
+ size_t _value_size;
+ size_t _ascii_prefix_size;
+ log_allocator_type::object_view _value;
+};
+
+using item_data_ptr = foreign_ptr<item_data>;
+
struct cache_stats {
size_t _get_hits {};
size_t _get_misses {};
@@ -324,6 +479,10 @@ struct item_insertion_data {
expiration expiry;
};
+class cache;
+
+__thread cache* cache_ptr;
+
class cache {
private:
using cache_type = bi::unordered_set<item,
@@ -340,49 +499,35 @@ private:
timer<> _timer;
cache_stats _stats;
timer<> _flush_timer;
+ std::unique_ptr<memory::reclaimer> _reclaimer;
private:
size_t item_size(item& item_ref) {
- constexpr size_t field_alignment = alignof(void*);
- return sizeof(item) +
- align_up(item_ref.key_size(), field_alignment) +
- align_up(item_ref.ascii_prefix_size(), field_alignment) +
- item_ref.value_size();
- }
-
- size_t item_size(item_insertion_data& insertion) {
- constexpr size_t field_alignment = alignof(void*);
- auto size = sizeof(item) +
- align_up(insertion.key.key().size(), field_alignment) +
- align_up(insertion.ascii_prefix.size(), field_alignment) +
- insertion.data.size();
-#ifdef __DEBUG__
- static bool print_item_footprint = true;
- if (print_item_footprint) {
- print_item_footprint = false;
- std::cout << __FUNCTION__ << ": " << size << "\n";
- std::cout << "sizeof(item) " << sizeof(item) << "\n";
- std::cout << "key.size " << insertion.key.key().size() << "\n";
- std::cout << "value.size " << insertion.data.size() << "\n";
- std::cout << "ascii_prefix.size " << insertion.ascii_prefix.size() << "\n";
- }
-#endif
- return size;
+ return sizeof(item) + item_ref.key_size() + item_ref.ascii_prefix_size()
+ + item_ref.value_size();
}
- template <bool IsInCache = true, bool IsInTimerList = true, bool Release = true>
+ enum class in_timer_list {
+ no,
+ yes,
+ };
+ enum class in_lru_list {
+ no,
+ yes,
+ };
+ template <in_timer_list timer = in_timer_list::yes, in_lru_list lru = in_lru_list::yes>
void erase(item& item_ref) {
- if (IsInCache) {
- _cache.erase(_cache.iterator_to(item_ref));
+ if (item_ref.erased()) {
+ return;
}
- if (IsInTimerList) {
+ if (timer == in_timer_list::yes) {
if (item_ref._expiry.ever_expires()) {
_alive.remove(item_ref);
}
}
+ item_ref.reset_value();
_stats._bytes -= item_size(item_ref);
- if (Release) {
- // memory used by item shouldn't be freed when slab is replacing it with another item.
- intrusive_ptr_release(&item_ref);
+ if (lru == in_lru_list::yes) {
+ evictor->remove(item_ref);
}
}
@@ -391,7 +536,7 @@ private:
while (!exp.empty()) {
auto item = &*exp.begin();
exp.pop_front();
- erase<true, false>(*item);
+ erase<in_timer_list::no>(*item);
_stats._expired++;
}
_timer.arm(_alive.get_next_timeout());
@@ -399,45 +544,11 @@ private:
inline
cache_iterator find(const item_key& key) {
- return _cache.find(key, std::hash<item_key>(), item_key_cmp());
- }
-
- template <typename Origin>
- inline
- cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) {
- auto& old_item = *i;
- uint64_t old_item_version = old_item._version;
-
- erase(old_item);
-
- size_t size = item_size(insertion);
- auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
- Origin::move_if_local(insertion.data), insertion.expiry, old_item_version + 1);
- intrusive_ptr_add_ref(new_item);
-
- auto insert_result = _cache.insert(*new_item);
- assert(insert_result.second);
- if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) {
- _timer.rearm(new_item->get_timeout());
- }
- _stats._bytes += size;
- return insert_result.first;
- }
-
- template <typename Origin>
- inline
- void add_new(item_insertion_data& insertion) {
- size_t size = item_size(insertion);
- auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
- Origin::move_if_local(insertion.data), insertion.expiry);
- intrusive_ptr_add_ref(new_item);
- auto& item_ref = *new_item;
- _cache.insert(item_ref);
- if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) {
- _timer.rearm(item_ref.get_timeout());
+ auto it = _cache.find(key, std::hash<item_key>(), item_key_cmp());
+ if (it != _cache.end() && !it->erased()) {
+ evictor->touch(*it);
}
- _stats._bytes += size;
- maybe_rehash();
+ return !it->erased() ? it : _cache.end();
}
void maybe_rehash() {
@@ -455,35 +566,146 @@ private:
_resize_up_threshold = _cache.bucket_count() * load_factor;
}
}
+
+ void set_item(item& item_ref, item_insertion_data&& insert, log_allocator_type::object_view&& data) {
+ data.write(0, insert.ascii_prefix.c_str(), insert.ascii_prefix.size());
+ data.write(insert.ascii_prefix.size(), insert.data.c_str(), insert.data.size());
+ item_ref.set_value_size(insert.data.size(), insert.ascii_prefix.size());
+
+ if (item_ref._expiry.ever_expires()) {
+ _alive.remove(item_ref);
+ }
+ if (insert.expiry.ever_expires() && _alive.insert(item_ref)) {
+ _timer.rearm(item_ref.get_timeout());
+ }
+ }
+
+ enum class can_overwrite {
+ no,
+ yes,
+ };
+ enum class can_create {
+ no,
+ yes,
+ };
+
+ // get a reference to item or create a new one in erased state
+ // erased state -- the item exists and is in the cache altough no
+ // value is associated with it, it is kind of intermediate state
+ // and it indicates that some operation on this item is in progres
+ // get requests ignore erased items
+ template<can_create create>
+ future<std::pair<bool, boost::intrusive_ptr<item>>> get_item(item_key&& key) {
+ auto it = _cache.find(key, std::hash<item_key>(), item_key_cmp());
+ if (it != _cache.end()) {
+ boost::intrusive_ptr<item> ptr(&*it);
+ return make_ready_future<std::pair<bool, boost::intrusive_ptr<item>>>(std::make_pair(!it->erased(), ptr));
+ }
+ if (create == can_create::no) {
+ return make_ready_future<std::pair<bool, boost::intrusive_ptr<item>>>(std::make_pair(false, nullptr));
+ }
+ return fixalloc->alloc().then([this, key = std::move(key)] (item* new_item) mutable {
+ auto key_size = key.key().size();
+ return logalloc->alloc(key_size, [new_item] {
+ return &new_item->reset_key();
+ }, [this, new_item, key = std::move(key)] (log_alloc_ptr* key_ptr) {
+ // check again, the item may have been created in the meantime
+ auto it = _cache.find(key, std::hash<item_key>(), item_key_cmp());
+ if (it != _cache.end()) {
+ fixalloc->free(new_item);
+ boost::intrusive_ptr<item> ptr(&*it);
+ return std::make_pair(!it->erased(), ptr);
+ }
+
+ log_allocator_type::object_view key_buffer(*key_ptr);
+ key_buffer.write(0, key.key().c_str(), key.key().size());
+ new_item->set_key_hash(key.hash());
+
+ boost::intrusive_ptr<item> ptr(new_item);
+ _cache.insert(*new_item);
+ maybe_rehash();
+ return std::make_pair(false, ptr);
+ });
+ });
+ }
+
+ // can_ovewrtie decides whether the addition can succeed if the item with a
+ // given key already exist. can_create decides whether the call to add()
+ // can create a new item if there isn't any with a specified key.
+ template<typename Origin, can_overwrite overwrite, can_create create>
+ future<std::pair<bool, bool>> add(item_insertion_data& insertion) {
+ static_assert(overwrite == can_overwrite::yes || create == can_create::yes, "impossible item add conditions");
+ auto key = Origin::move_if_local(insertion.key);
+ return get_item<create>(std::move(key)).then([this, insert = Origin::move_if_local(insertion)] (std::pair<bool, boost::intrusive_ptr<item>> ret) mutable {
+ if (!ret.second || (overwrite == can_overwrite::no && ret.first && !ret.second->erased())) {
+ return make_ready_future<std::pair<bool, bool>>(std::make_pair(false, false));
+ }
+ auto value_size = insert.data.size() + insert.ascii_prefix.size();
+ return logalloc->alloc(value_size, [this, ptr = ret.second] () -> log_alloc_ptr* {
+ if (ptr->erased()) {
+ if (create == can_create::no) {
+ return nullptr;
+ }
+ evictor->add(*ptr);
+ } else {
+ evictor->touch(*ptr);
+ _stats._bytes -= item_size(*ptr);
+ }
+ return &ptr->reset_value();
+ }, [this, ret, insert = std::move(insert)] (log_alloc_ptr* ptr) {
+ if (!ptr) {
+ return std::make_pair(false, false);
+ }
+ log_allocator_type::object_view value(*ptr);
+ value.write(0, insert.ascii_prefix.c_str(), insert.ascii_prefix.size());
+ value.write(insert.ascii_prefix.size(), insert.data.c_str(), insert.data.size());
+
+ auto& item_ref = *ret.second;
+ item_ref.set_value_size(insert.data.size(), insert.ascii_prefix.size());
+ item_ref.set_expiry(insert.expiry);
+ if (insert.expiry.ever_expires() && _alive.insert(item_ref)) {
+ _timer.rearm(item_ref.get_timeout());
+ }
+ _stats._bytes += item_size(item_ref);
+ return std::make_pair(true, ret.first);
+ });
+ });
+ }
public:
- cache(uint64_t per_cpu_slab_size, uint64_t slab_page_size)
+ cache()
: _buckets(new cache_type::bucket_type[initial_bucket_count])
, _cache(cache_type::bucket_traits(_buckets, initial_bucket_count))
{
_timer.set_callback([this] { expire(); });
_flush_timer.set_callback([this] { flush_all(); });
- // initialize per-thread slab allocator.
- slab = new slab_allocator<item>(default_slab_growth_factor, per_cpu_slab_size, slab_page_size,
- [this](item& item_ref) { erase<true, true, false>(item_ref); _stats._evicted++; });
-#ifdef __DEBUG__
- static bool print_slab_classes = true;
- if (print_slab_classes) {
- print_slab_classes = false;
- slab->print_slab_classes();
- }
-#endif
+ cache_ptr = this;
+ segalloc = new segment_allocator_type;
+ evictor = new evictor_type;
+ logalloc = new log_allocator_type(*segalloc, *evictor);
+ fixalloc = new fixed_allocator_type(*segalloc);
+ fixalloc->set_reclaim_hook([] { return logalloc->reclaim(1); });
+ _reclaimer.reset(new memory::reclaimer([this] {
+ logalloc->reclaim(1);
+ segalloc->free_memory();
+ }));
}
~cache() {
- flush_all();
+ flush_all();
+ delete evictor;
+ delete fixalloc;
+ delete logalloc;
+ delete segalloc;
+ }
+
+ void remove_from_hashtable(item& item_ref) {
+ _cache.erase(_cache.iterator_to(item_ref));
}
void flush_all() {
_flush_timer.cancel();
- _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item* it) {
- erase<false, true>(*it);
- });
+ evictor->evict_all();
}
void flush_at(clock_type::time_point time_point) {
@@ -491,43 +713,36 @@ public:
}
template <typename Origin = local_origin_tag>
- bool set(item_insertion_data& insertion) {
- auto i = find(insertion.key);
- if (i != _cache.end()) {
- add_overriding<Origin>(i, insertion);
- _stats._set_replaces++;
- return true;
- } else {
- add_new<Origin>(insertion);
- _stats._set_adds++;
- return false;
- }
+ future<bool> set(item_insertion_data& insertion) {
+ return add<Origin, can_overwrite::yes, can_create::yes>(insertion).then([this] (auto ret) {
+ if (ret.second) {
+ _stats._set_replaces++;
+ } else {
+ _stats._set_adds++;
+ }
+ return make_ready_future<bool>(ret.second);
+ });
}
template <typename Origin = local_origin_tag>
- bool add(item_insertion_data& insertion) {
- auto i = find(insertion.key);
- if (i != _cache.end()) {
- return false;
- }
-
- _stats._set_adds++;
- add_new<Origin>(insertion);
- return true;
+ future<bool> add(item_insertion_data& insertion) {
+ return add<Origin, can_overwrite::no, can_create::yes>(insertion).then([this] (auto ret) {
+ if (ret.first) {
+ _stats._set_adds++;
+ }
+ return make_ready_future<bool>(ret.first);
+ });
}
template <typename Origin = local_origin_tag>
- bool replace(item_insertion_data& insertion) {
- auto i = find(insertion.key);
- if (i == _cache.end()) {
- return false;
- }
-
- _stats._set_replaces++;
- add_overriding<Origin>(i, insertion);
- return true;
+ future<bool> replace(item_insertion_data& insertion) {
+ return add<Origin, can_overwrite::yes, can_create::no>(insertion).then([this] (auto ret) {
+ if (ret.first) {
+ _stats._set_replaces++;
+ }
+ return make_ready_future<bool>(ret.first);
+ });
}
-
bool remove(const item_key& key) {
auto i = find(key);
if (i == _cache.end()) {
@@ -540,32 +755,58 @@ public:
return true;
}
- item_ptr get(const item_key& key) {
+ void evict(item& item_ref) {
+ erase<in_timer_list::yes, in_lru_list::no>(item_ref);
+ _stats._evicted++;
+ }
+
+ future<item_data_ptr> get(const item_key& key) {
auto i = find(key);
if (i == _cache.end()) {
_stats._get_misses++;
- return nullptr;
+ return make_ready_future<item_data_ptr>();
}
_stats._get_hits++;
auto& item_ref = *i;
- return item_ptr(&item_ref);
+ return make_ready_future<item_data_ptr>(item_data(&item_ref));
}
template <typename Origin = local_origin_tag>
- cas_result cas(item_insertion_data& insertion, item::version_type version) {
+ future<cas_result> cas(item_insertion_data& insertion, item::version_type version) {
auto i = find(insertion.key);
if (i == _cache.end()) {
_stats._cas_misses++;
- return cas_result::not_found;
+ return make_ready_future<cas_result>(cas_result::not_found);
}
auto& item_ref = *i;
if (item_ref._version != version) {
_stats._cas_badval++;
- return cas_result::bad_version;
+ return make_ready_future<cas_result>(cas_result::bad_version);
}
- _stats._cas_hits++;
- add_overriding<Origin>(i, insertion);
- return cas_result::stored;
+ boost::intrusive_ptr<item> ptr = &item_ref;
+ auto insert = Origin::move_if_local(insertion);
+ auto data_size = insert.data.size() + insert.ascii_prefix.size();
+ return logalloc->alloc(data_size, [this, ptr, version] () -> log_alloc_ptr* {
+ if (ptr->erased() || ptr->_version != version) {
+ return nullptr;
+ }
+ _stats._bytes -= item_size(*ptr);
+ return &ptr->reset_value();
+ }, [this, ptr, version, insert = std::move(insert)] (log_alloc_ptr* data) mutable {
+ if (!data) {
+ if (ptr->_version - 1 != version) {
+ _stats._cas_badval++;
+ return make_ready_future<cas_result>(cas_result::bad_version);
+ } else {
+ _stats._cas_misses++;
+ return make_ready_future<cas_result>(cas_result::not_found);
+ }
+ }
+ set_item(*ptr, std::move(insert), log_allocator_type::object_view(*data));
+ _stats._cas_hits++;
+ _stats._bytes += item_size(*ptr);
+ return make_ready_future<cas_result>(cas_result::stored);
+ });
}
size_t size() {
@@ -582,49 +823,123 @@ public:
}
template <typename Origin = local_origin_tag>
- std::pair<item_ptr, bool> incr(item_key& key, uint64_t delta) {
+ future<std::pair<item_data_ptr, bool>> incr(item_key& key, uint64_t delta) {
auto i = find(key);
if (i == _cache.end()) {
_stats._incr_misses++;
- return {item_ptr{}, false};
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
auto& item_ref = *i;
- _stats._incr_hits++;
auto value = item_ref.data_as_integral();
if (!value) {
- return {boost::intrusive_ptr<item>(&item_ref), false};
+ _stats._incr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(&item_ref), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
item_insertion_data insertion {
.key = Origin::move_if_local(key),
- .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
+ .ascii_prefix = item_ref.ascii_prefix(),
.data = to_sstring(*value + delta),
.expiry = item_ref._expiry
};
- i = add_overriding<local_origin_tag>(i, insertion);
- return {boost::intrusive_ptr<item>(&*i), true};
+ boost::intrusive_ptr<item> ptr = &item_ref;
+ auto data_size = insertion.data.size() + insertion.ascii_prefix.size();
+ return logalloc->alloc(data_size, [this, ptr, old_value = *value] {
+ if (ptr->erased()) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr,
+ optional<uint64_t>());
+ }
+ auto value = ptr->data_as_integral();
+ if (!value || *value != old_value) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr, value);
+ }
+ _stats._bytes -= item_size(*ptr);
+ return std::make_pair(&ptr->reset_value(), value);
+ }, [this, ptr, delta, insert = std::move(insertion),
+ old_value = *value] (std::pair<log_alloc_ptr*, optional<uint64_t>> ret) mutable {
+ if (ptr->erased()) {
+ _stats._incr_misses++;
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (!ret.second) {
+ _stats._incr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (*ret.second != old_value) {
+ return incr(insert.key, delta);
+ }
+ assert(ret.first);
+ set_item(*ptr, std::move(insert), log_allocator_type::object_view(*ret.first));
+ _stats._incr_hits++;
+ _stats._bytes += item_size(*ptr);
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), true};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ });
}
template <typename Origin = local_origin_tag>
- std::pair<item_ptr, bool> decr(item_key& key, uint64_t delta) {
+ future<std::pair<item_data_ptr, bool>> decr(item_key& key, uint64_t delta) {
auto i = find(key);
if (i == _cache.end()) {
_stats._decr_misses++;
- return {item_ptr{}, false};
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
auto& item_ref = *i;
- _stats._decr_hits++;
auto value = item_ref.data_as_integral();
if (!value) {
- return {boost::intrusive_ptr<item>(&item_ref), false};
+ _stats._decr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(&item_ref), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
}
item_insertion_data insertion {
.key = Origin::move_if_local(key),
- .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
+ .ascii_prefix = item_ref.ascii_prefix(),
.data = to_sstring(*value - std::min(*value, delta)),
.expiry = item_ref._expiry
};
- i = add_overriding<local_origin_tag>(i, insertion);
- return {boost::intrusive_ptr<item>(&*i), true};
+ boost::intrusive_ptr<item> ptr = &item_ref;
+ auto data_size = insertion.data.size() + insertion.ascii_prefix.size();
+ return logalloc->alloc(data_size, [this, ptr, old_value = *value] {
+ if (ptr->erased()) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr,
+ optional<uint64_t>());
+ }
+ auto value = ptr->data_as_integral();
+ if (!value || *value != old_value) {
+ return std::pair<log_alloc_ptr*,
+ optional<uint64_t>>(nullptr, value);
+ }
+ _stats._bytes -= item_size(*ptr);
+ return std::make_pair(&ptr->reset_value(), value);
+ }, [this, ptr, delta, insert = std::move(insertion),
+ old_value = *value] (std::pair<log_alloc_ptr*, optional<uint64_t>> ret) mutable {
+ if (ptr->erased()) {
+ _stats._decr_misses++;
+ std::pair<item_data_ptr, bool> pair = {item_data{}, false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (!ret.second) {
+ _stats._decr_hits++;
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), false};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ }
+ if (*ret.second != old_value) {
+ return decr(insert.key, delta);
+ }
+ assert(ret.first);
+ set_item(*ptr, std::move(insert), log_allocator_type::object_view(*ret.first));
+ _stats._decr_hits++;
+ _stats._bytes += item_size(*ptr);
+ std::pair<item_data_ptr, bool> pair = {item_data(ptr), true};
+ return make_ready_future<std::pair<item_data_ptr, bool>>(std::move(pair));
+ });
}
std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> print_hash_stats() {
@@ -671,6 +986,20 @@ public:
future<> stop() { return make_ready_future<>(); }
};
+inline void intrusive_ptr_release(item* it) {
+ --it->_ref_count;
+ assert(it->_ref_count >= 0);
+ if (it->_ref_count == 0) {
+ cache_ptr->remove_from_hashtable(*it);
+ fixalloc->free(it);
+ }
+}
+
+void evict_item(item& item_ref)
+{
+ cache_ptr->evict(item_ref);
+}
+
class sharded_cache {
private:
distributed<cache>& _peers;
@@ -694,7 +1023,7 @@ public:
future<bool> set(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<bool>(_peers.local().set(insertion));
+ return _peers.local().set(insertion);
}
return _peers.invoke_on(cpu, &cache::set<remote_origin_tag>, std::ref(insertion));
}
@@ -703,7 +1032,7 @@ public:
future<bool> add(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<bool>(_peers.local().add(insertion));
+ return _peers.local().add(insertion);
}
return _peers.invoke_on(cpu, &cache::add<remote_origin_tag>, std::ref(insertion));
}
@@ -712,7 +1041,7 @@ public:
future<bool> replace(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<bool>(_peers.local().replace(insertion));
+ return _peers.local().replace(insertion);
}
return _peers.invoke_on(cpu, &cache::replace<remote_origin_tag>, std::ref(insertion));
}
@@ -724,7 +1053,7 @@ public:
}
// The caller must keep @key live until the resulting future resolves.
- future<item_ptr> get(const item_key& key) {
+ future<item_data_ptr> get(const item_key& key) {
auto cpu = get_cpu(key);
return _peers.invoke_on(cpu, &cache::get, std::ref(key));
}
@@ -733,7 +1062,7 @@ public:
future<cas_result> cas(item_insertion_data& insertion, item::version_type version) {
auto cpu = get_cpu(insertion.key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<cas_result>(_peers.local().cas(insertion, version));
+ return _peers.local().cas(insertion, version);
}
return _peers.invoke_on(cpu, &cache::cas<remote_origin_tag>, std::ref(insertion), std::move(version));
}
@@ -743,21 +1072,19 @@ public:
}
// The caller must keep @key live until the resulting future resolves.
- future<std::pair<item_ptr, bool>> incr(item_key& key, uint64_t delta) {
+ auto incr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<std::pair<item_ptr, bool>>(
- _peers.local().incr<local_origin_tag>(key, delta));
+ return _peers.local().incr<local_origin_tag>(key, delta);
}
return _peers.invoke_on(cpu, &cache::incr<remote_origin_tag>, std::ref(key), std::move(delta));
}
// The caller must keep @key live until the resulting future resolves.
- future<std::pair<item_ptr, bool>> decr(item_key& key, uint64_t delta) {
+ auto decr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
if (engine().cpu_id() == cpu) {
- return make_ready_future<std::pair<item_ptr, bool>>(
- _peers.local().decr(key, delta));
+ return _peers.local().decr<local_origin_tag>(key, delta);
}
return _peers.invoke_on(cpu, &cache::decr<remote_origin_tag>, std::ref(key), std::move(delta));
}
@@ -808,7 +1135,7 @@ private:
memcache_ascii_parser _parser;
item_key _item_key;
item_insertion_data _insertion;
- std::vector<item_ptr> _items;
+ std::vector<item_data_ptr> _items;
private:
static constexpr const char *msg_crlf = "\r\n";
static constexpr const char *msg_error = "ERROR\r\n";
@@ -826,24 +1153,29 @@ private:
static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n";
private:
template <bool WithVersion>
- static void append_item(scattered_message<char>& msg, item_ptr item) {
- if (!item) {
+ static void append_item(scattered_message<char>& msg, item_data_ptr idata) {
+ if (!idata) {
return;
}
-
msg.append_static("VALUE ");
- msg.append_static(item->key());
- msg.append_static(item->ascii_prefix());
+ for (auto&& vec : idata->key()) {
+ msg.append_static(static_cast<char*>(vec.iov_base), vec.iov_len);
+ }
+ idata->iterate_prefix([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
+ });
if (WithVersion) {
msg.append_static(" ");
- msg.append(to_sstring(item->version()));
+ msg.append(to_sstring(idata->parent().version()));
}
msg.append_static(msg_crlf);
- msg.append_static(item->value());
+ idata->iterate_value([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
+ });
msg.append_static(msg_crlf);
- msg.on_delete([item = std::move(item)] {});
+ msg.on_delete([idata = std::move(idata)] {});
}
template <bool WithVersion>
@@ -1106,17 +1438,21 @@ public:
return std::move(f).discard_result();
}
return std::move(f).then([&out] (auto result) {
- auto item = std::move(result.first);
- if (!item) {
+ auto idata = std::move(result.first);
+ if (!idata) {
return out.write(msg_not_found);
}
auto incremented = result.second;
if (!incremented) {
return out.write(msg_error_non_numeric_value);
}
- return out.write(item->value().data(), item->value_size()).then([&out] {
- return out.write(msg_crlf);
+ scattered_message<char> msg;
+ idata->iterate_value([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
});
+ msg.append_static(msg_crlf);
+ msg.on_delete([idata = std::move(idata)] { });
+ return out.write(std::move(msg));
});
}
@@ -1127,17 +1463,21 @@ public:
return std::move(f).discard_result();
}
return std::move(f).then([&out] (auto result) {
- auto item = std::move(result.first);
- if (!item) {
+ auto idata = std::move(result.first);
+ if (!idata) {
return out.write(msg_not_found);
}
auto decremented = result.second;
if (!decremented) {
return out.write(msg_error_non_numeric_value);
}
- return out.write(item->value().data(), item->value_size()).then([&out] {
- return out.write(msg_crlf);
+ scattered_message<char> msg;
+ idata->iterate_value([&msg] (void* ptr, size_t len) {
+ msg.append_static(static_cast<char*>(ptr), len);
});
+ msg.append_static(msg_crlf);
+ msg.on_delete([idata = std::move(idata)] { });
+ return out.write(std::move(msg));
});
}
};
@@ -1356,10 +1696,6 @@ int main(int ac, char** av) {
app.add_options()
("max-datagram-size", bpo::value<int>()->default_value(memcache::udp_server::default_max_datagram_size),
"Maximum size of UDP datagram")
- ("max-slab-size", bpo::value<uint64_t>()->default_value(memcache::default_per_cpu_slab_size/MB),
- "Maximum memory to be used for items (value in megabytes) (reclaimer is disabled if set)")
- ("slab-page-size", bpo::value<uint64_t>()->default_value(memcache::default_slab_page_size/MB),
- "Size of slab page (value in megabytes)")
("stats",
"Print basic statistics periodically (every second)")
("port", bpo::value<uint16_t>()->default_value(11211),
@@ -1374,9 +1710,7 @@ int main(int ac, char** av) {
auto&& config = app.configuration();
uint16_t port = config["port"].as<uint16_t>();
- uint64_t per_cpu_slab_size = config["max-slab-size"].as<uint64_t>() * MB;
- uint64_t slab_page_size = config["slab-page-size"].as<uint64_t>() * MB;
- return cache_peers.start(std::move(per_cpu_slab_size), std::move(slab_page_size)).then([&system_stats] {
+ return cache_peers.start().then([&system_stats] {
return system_stats.start(clock_type::now());
}).then([&] {
std::cout << PLATFORM << " memcached " << VERSION << "\n";
--
2.1.4