[PATCH 0/4] Time window compaction strategy support

210 views
Skip to first unread message

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 17, 2017, 1:55:35 AM7/17/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Time window strategy was introduced to address several limitations of
date tiered strategy. In addition, its options are much easier to reason
about, basically just window size and window unit.
TWCS will work to keep only one sstable in each window. So the only real
optimization needed is to align partition key to the window.
Size tiered strategy is used to reduce write amplification when compacting
the incoming window.

For more details: https://issues.apache.org/jira/browse/CASSANDRA-9666
Fixes #1432.

TODO: override default values with options provided in schema
TODO: more testing
TODO: compare TWCS and DTCS IO/CPU usage

Raphael S. Carvalho (4):
sstables: import TimeWindowCompactionStrategy.java
sstables: implement time window compaction strategy
compaction: wire up time window compaction strategy
tests: add tests for time window compaction strategy

compaction_strategy.hh | 5 +
sstables/time_window_compaction_strategy.hh | 258 ++++++++++++++++++++++++++++
sstables/compaction_strategy.cc | 4 +
tests/sstable_datafile_test.cc | 121 +++++++++++++
4 files changed, 388 insertions(+)
create mode 100644 sstables/time_window_compaction_strategy.hh

--
2.9.4

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 17, 2017, 1:55:38 AM7/17/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
it will be later converted to C++. Imported from latest scylla-
tools-java repository. Checked that it doesn't lack anything.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/TimeWindowCompactionStrategy.java | 380 +++++++++++++++++++++++++++++
1 file changed, 380 insertions(+)
create mode 100644 sstables/TimeWindowCompactionStrategy.java

diff --git a/sstables/TimeWindowCompactionStrategy.java b/sstables/TimeWindowCompactionStrategy.java
new file mode 100644
index 0000000..e2ab7dc
--- /dev/null
+++ b/sstables/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Iterables.filter;
+
+public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
+{
+ private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
+
+ private final TimeWindowCompactionStrategyOptions options;
+ protected volatile int estimatedRemainingTasks;
+ private final Set<SSTableReader> sstables = new HashSet<>();
+ private long lastExpiredCheck;
+ private long highestWindowSeen;
+
+ public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+ {
+ super(cfs, options);
+ this.estimatedRemainingTasks = 0;
+ this.options = new TimeWindowCompactionStrategyOptions(options);
+ if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
+ {
+ disableTombstoneCompactions = true;
+ logger.debug("Disabling tombstone compactions for TWCS");
+ }
+ else
+ logger.debug("Enabling tombstone compactions for TWCS");
+
+ }
+
+ @Override
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ {
+ while (true)
+ {
+ List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
+
+ if (latestBucket.isEmpty())
+ return null;
+
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+ if (modifier != null)
+ return new CompactionTask(cfs, modifier, gcBefore);
+ }
+ }
+
+ /**
+ *
+ * @param gcBefore
+ * @return
+ */
+ private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+ {
+ if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+ return Collections.emptyList();
+
+ Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
+
+ // Find fully expired SSTables. Those will be included no matter what.
+ Set<SSTableReader> expired = Collections.emptySet();
+
+ if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
+ {
+ logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+ expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
+ lastExpiredCheck = System.currentTimeMillis();
+ }
+ else
+ {
+ logger.debug("TWCS skipping check for fully expired SSTables");
+ }
+
+ Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+ List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
+ if (!expired.isEmpty())
+ {
+ logger.debug("Including expired sstables: {}", expired);
+ compactionCandidates.addAll(expired);
+ }
+
+ return compactionCandidates;
+ }
+
+ private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
+ {
+ List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
+
+ if (mostInteresting != null)
+ {
+ return mostInteresting;
+ }
+
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+ for (SSTableReader sstable : nonExpiringSSTables)
+ {
+ if (worthDroppingTombstones(sstable, gcBefore))
+ sstablesWithTombstones.add(sstable);
+ }
+ if (sstablesWithTombstones.isEmpty())
+ return Collections.emptyList();
+
+ return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+ }
+
+ private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+ {
+ Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
+ // Update the highest window seen, if necessary
+ if(buckets.right > this.highestWindowSeen)
+ this.highestWindowSeen = buckets.right;
+
+ updateEstimatedCompactionsByTasks(buckets.left);
+ List<SSTableReader> mostInteresting = newestBucket(buckets.left,
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold(),
+ options.sstableWindowUnit,
+ options.sstableWindowSize,
+ options.stcsOptions,
+ this.highestWindowSeen);
+ if (!mostInteresting.isEmpty())
+ return mostInteresting;
+ return null;
+ }
+
+ @Override
+ public void addSSTable(SSTableReader sstable)
+ {
+ sstables.add(sstable);
+ }
+
+ @Override
+ public void removeSSTable(SSTableReader sstable)
+ {
+ sstables.remove(sstable);
+ }
+
+ /**
+ * Find the lowest and highest timestamps in a given timestamp/unit pair
+ * Returns milliseconds, caller should adjust accordingly
+ */
+ public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
+ {
+ long lowerTimestamp;
+ long upperTimestamp;
+ long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
+
+ switch(windowTimeUnit)
+ {
+ case MINUTES:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
+ break;
+ case HOURS:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
+ break;
+ case DAYS:
+ default:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
+ break;
+ }
+
+ return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
+ TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
+
+ }
+
+ /**
+ * Group files with similar max timestamp into buckets.
+ *
+ * @param files pairs consisting of a file and its min timestamp
+ * @param sstableWindowUnit
+ * @param sstableWindowSize
+ * @param timestampResolution
+ * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
+ */
+ @VisibleForTesting
+ static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
+ {
+ HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+ long maxTimestamp = 0;
+ // Create hash map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+ for (SSTableReader f : files)
+ {
+ assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+ long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+ buckets.put(bounds.left, f);
+ if (bounds.left > maxTimestamp)
+ maxTimestamp = bounds.left;
+ }
+
+ logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+ return Pair.create(buckets, maxTimestamp);
+ }
+
+ private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+ {
+ int n = 0;
+ long now = this.highestWindowSeen;
+
+ for(Long key : tasks.keySet())
+ {
+ // For current window, make sure it's compactable
+ if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
+ n++;
+ else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+ n++;
+ }
+ this.estimatedRemainingTasks = n;
+ }
+
+
+ /**
+ * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
+ * @param minThreshold minimum number of sstables in a bucket to qualify.
+ * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
+ * @return a bucket (list) of sstables to compact.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+ {
+ // If the current bucket has at least minThreshold SSTables, choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+ Iterator<Long> it = allKeys.descendingIterator();
+ while(it.hasNext())
+ {
+ Long key = it.next();
+ Set<SSTableReader> bucket = buckets.get(key);
+ logger.trace("Key {}, now {}", key, now);
+ if (bucket.size() >= minThreshold && key >= now)
+ {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+ List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+ logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+ List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcsInterestingBucket.isEmpty())
+ return stcsInterestingBucket;
+ }
+ else if (bucket.size() >= 2 && key < now)
+ {
+ logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+ return trimToThreshold(bucket, maxThreshold);
+ }
+ else
+ {
+ logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return Collections.<SSTableReader>emptyList();
+ }
+
+ /**
+ * @param bucket set of sstables
+ * @param maxThreshold maximum number of sstables in a single compaction task.
+ * @return A bucket trimmed to the maxThreshold newest sstables.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
+ {
+ List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+ // Trim the largest sstables off the end to meet the maxThreshold
+ Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+
+ return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
+ }
+
+ @Override
+ public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
+ {
+ Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
+ if (Iterables.isEmpty(filteredSSTables))
+ return null;
+ LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+ if (txn == null)
+ return null;
+ return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+ }
+
+ @Override
+ public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (modifier == null)
+ {
+ logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
+ return null;
+ }
+
+ return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ return this.estimatedRemainingTasks;
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return Long.MAX_VALUE;
+ }
+
+
+ public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+ {
+ Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+ uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+ uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+ uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
+ return uncheckedOptions;
+ }
+
+ public String toString()
+ {
+ return String.format("TimeWindowCompactionStrategy[%s/%s]",
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold());
+ }
+}
--
2.9.4

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 17, 2017, 1:55:41 AM7/17/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
For more details, https://issues.apache.org/jira/browse/CASSANDRA-9666

Fixes #1432.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/time_window_compaction_strategy.hh | 258 +++++++++++++++++++
sstables/TimeWindowCompactionStrategy.java | 380 ----------------------------
2 files changed, 258 insertions(+), 380 deletions(-)
create mode 100644 sstables/time_window_compaction_strategy.hh
delete mode 100644 sstables/TimeWindowCompactionStrategy.java

diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
new file mode 100644
index 0000000..f213c01
--- /dev/null
+++ b/sstables/time_window_compaction_strategy.hh
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Copyright (C) 2017 ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "compaction_strategy_impl.hh"
+#include "compaction.hh"
+#include "timestamp.hh"
+#include <boost/range/algorithm/partial_sort.hpp>
+
+namespace sstables {
+
+extern logging::logger clogger;
+
+struct time_window_compaction_strategy_options {
+ static constexpr std::chrono::seconds DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return std::chrono::seconds(86400*window_size); }
+ static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+ static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return std::chrono::seconds(60*10); }
+
+ std::chrono::seconds sstable_window_size = DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
+ db_clock::duration expired_sstable_check_frequency = DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
+
+ // FIXME: override default values with the ones in schema.
+};
+
+using timestamp_type = api::timestamp_type;
+
+class time_window_compaction_strategy : public compaction_strategy_impl {
+ time_window_compaction_strategy_options _options;
+ int64_t _estimated_remaining_tasks = 0;
+ db_clock::time_point _last_expired_check;
+ timestamp_type _highest_window_seen;
+public:
+ time_window_compaction_strategy(const std::map<sstring, sstring>& options)
+ : compaction_strategy_impl(options)
+ {
+ if (!options.count(TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.count(TOMBSTONE_THRESHOLD_OPTION)) {
+ _disable_tombstone_compaction = true;
+ clogger.debug("Disabling tombstone compactions for TWCS");
+ } else {
+ clogger.debug("Enabling tombstone compactions for TWCS");
+ }
+ _use_clustering_key_filter = true;
+ }
+
+ virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override {
+ auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
+
+ if (candidates.empty()) {
+ return compaction_descriptor();
+ }
+
+ // Find fully expired SSTables. Those will be included no matter what.
+ std::vector<shared_sstable> expired;
+
+ if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
+ clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+ expired = get_fully_expired_sstables(cf, candidates, gc_before.time_since_epoch().count());
+ _last_expired_check = db_clock::now();
+ } else {
+ clogger.debug("TWCS skipping check for fully expired SSTables");
+ }
+
+ std::vector<shared_sstable> non_expired;
+
+ if (!expired.empty()) {
+ auto cmp = [] (const shared_sstable& x, const shared_sstable& y) {
+ return x->generation() < y->generation();
+ };
+ boost::range::sort(candidates, cmp);
+ boost::range::sort(expired, cmp);
+ boost::set_difference(candidates, expired, std::back_inserter(non_expired), cmp);
+ } else {
+ non_expired = std::move(candidates);
+ }
+
+ auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(non_expired), gc_before);
+ if (!expired.empty()) {
+ compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
+ }
+ return compaction_candidates;
+ }
+private:
+ std::vector<shared_sstable>
+ get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
+ auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
+
+ if (!most_interesting.empty()) {
+ return most_interesting;
+ }
+
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
+ });
+ non_expiring_sstables.erase(e, non_expiring_sstables.end());
+ if (non_expiring_sstables.empty()) {
+ return {};
+ }
+ auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
+ return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
+ });
+ return { *it };
+ }
+
+ std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
+ auto p = get_buckets(std::move(candidate_sstables), _options.sstable_window_size);
+ // Update the highest window seen, if necessary
+ _highest_window_seen = std::max(_highest_window_seen, p.second);
+
+ update_estimated_compaction_by_tasks(p.first, cf.schema()->min_compaction_threshold());
+
+ return newest_bucket(std::move(p.first), cf.schema()->min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
+ _options.sstable_window_size, _highest_window_seen);
+ }
+public:
+ // Find the lowest timestamp for window of given size
+ static timestamp_type
+ get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
+ using namespace std::chrono;
+ auto timestamp_in_sec = duration_cast<seconds>(microseconds(timestamp)).count();
+
+ // mask out window size from timestamp to get lower bound of its window
+ auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count()));
+
+ return timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
+ }
+
+ // Group files with similar max timestamp into buckets.
+ // @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
+ // and the right is the highest timestamp seen
+ static std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
+ get_buckets(std::vector<shared_sstable> files, std::chrono::seconds sstable_window_size) {
+ std::map<timestamp_type, std::vector<shared_sstable>> buckets;
+
+ timestamp_type max_timestamp = 0;
+ // Create map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+ for (auto&& f : files) {
+ timestamp_type ts = f->get_stats_metadata().max_timestamp;
+ timestamp_type lower_bound = get_window_lower_bound(sstable_window_size, ts);
+ buckets[lower_bound].push_back(std::move(f));
+ max_timestamp = std::max(max_timestamp, lower_bound);
+ }
+
+ return std::make_pair(std::move(buckets), max_timestamp);
+ }
+
+ static std::vector<shared_sstable>
+ newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets, int min_threshold, int max_threshold,
+ std::chrono::seconds sstable_window_size, timestamp_type now) {
+ // If the current bucket has at least minThreshold SSTables, choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ for (auto it = buckets.rbegin(); it != buckets.rend(); it++) {
+ auto key = it->first;
+ auto& bucket = it->second;
+
+ clogger.trace("Key {}, now {}", key, now);
+
+ if (bucket.size() >= size_t(min_threshold) && key >= now) {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ auto stcs_interesting_bucket = size_tiered_most_interesting_bucket(bucket);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcs_interesting_bucket.empty()) {
+ return stcs_interesting_bucket;
+ }
+ } else if (bucket.size() >= 2 && key < now) {
+ clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
+ return trim_to_threshold(std::move(bucket), max_threshold);
+ } else {
+ clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return {};
+ }
+
+ static std::vector<shared_sstable>
+ trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
+ auto n = std::min(bucket.size(), size_t(max_threshold));
+ // Trim the largest sstables off the end to meet the maxThreshold
+ boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
+ return i->ondisk_data_size() < j->ondisk_data_size();
+ });
+ bucket.resize(n);
+ return bucket;
+ }
+private:
+ void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold) {
+ int64_t n = 0;
+ timestamp_type now = _highest_window_seen;
+
+ for (auto task : tasks) {
+ auto key = task.first;
+
+ // For current window, make sure it's compactable
+ auto count = task.second.size();
+ if (key >= now && count >= size_t(min_threshold)) {
+ n++;
+ } else if (key < now && count >= 2) {
+ n++;
+ }
+ }
+ _estimated_remaining_tasks = n;
+ }
+public:
+ virtual int64_t estimated_pending_compactions(column_family& cf) const override {
+ return _estimated_remaining_tasks;
+ }
+
+ virtual compaction_strategy_type type() const {
+ throw std::runtime_error("TWCS not wired yet");
+ }
+};
+
+}
diff --git a/sstables/TimeWindowCompactionStrategy.java b/sstables/TimeWindowCompactionStrategy.java
deleted file mode 100644
index e2ab7dc..0000000
--- a/sstables/TimeWindowCompactionStrategy.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.compaction;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.schema.CompactionParams;
-import org.apache.cassandra.utils.Pair;
-
-import static com.google.common.collect.Iterables.filter;
-
-public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
-{
- private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
-
- private final TimeWindowCompactionStrategyOptions options;
- protected volatile int estimatedRemainingTasks;
- private final Set<SSTableReader> sstables = new HashSet<>();
- private long lastExpiredCheck;
- private long highestWindowSeen;
-
- public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
- {
- super(cfs, options);
- this.estimatedRemainingTasks = 0;
- this.options = new TimeWindowCompactionStrategyOptions(options);
- if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
- {
- disableTombstoneCompactions = true;
- logger.debug("Disabling tombstone compactions for TWCS");
- }
- else
- logger.debug("Enabling tombstone compactions for TWCS");
-
- }
-
- @Override
- public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
- {
- while (true)
- {
- List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
-
- if (latestBucket.isEmpty())
- return null;
-
- LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
- if (modifier != null)
- return new CompactionTask(cfs, modifier, gcBefore);
- }
- }
-
- /**
- *
- * @param gcBefore
- * @return
- */
- private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
- {
- if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
- return Collections.emptyList();
-
- Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
-
- // Find fully expired SSTables. Those will be included no matter what.
- Set<SSTableReader> expired = Collections.emptySet();
-
- if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
- {
- logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
- expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
- lastExpiredCheck = System.currentTimeMillis();
- }
- else
- {
- logger.debug("TWCS skipping check for fully expired SSTables");
- }
-
- Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
-
- List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
- if (!expired.isEmpty())
- {
- logger.debug("Including expired sstables: {}", expired);
- compactionCandidates.addAll(expired);
- }
-
- return compactionCandidates;
- }
-
- private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
- {
- List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
-
- if (mostInteresting != null)
- {
- return mostInteresting;
- }
-
- // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
- // ratio is greater than threshold.
- List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
- for (SSTableReader sstable : nonExpiringSSTables)
- {
- if (worthDroppingTombstones(sstable, gcBefore))
- sstablesWithTombstones.add(sstable);
- }
- if (sstablesWithTombstones.isEmpty())
- return Collections.emptyList();
-
- return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
- }
-
- private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
- {
- Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
- // Update the highest window seen, if necessary
- if(buckets.right > this.highestWindowSeen)
- this.highestWindowSeen = buckets.right;
-
- updateEstimatedCompactionsByTasks(buckets.left);
- List<SSTableReader> mostInteresting = newestBucket(buckets.left,
- cfs.getMinimumCompactionThreshold(),
- cfs.getMaximumCompactionThreshold(),
- options.sstableWindowUnit,
- options.sstableWindowSize,
- options.stcsOptions,
- this.highestWindowSeen);
- if (!mostInteresting.isEmpty())
- return mostInteresting;
- return null;
- }
-
- @Override
- public void addSSTable(SSTableReader sstable)
- {
- sstables.add(sstable);
- }
-
- @Override
- public void removeSSTable(SSTableReader sstable)
- {
- sstables.remove(sstable);
- }
-
- /**
- * Find the lowest and highest timestamps in a given timestamp/unit pair
- * Returns milliseconds, caller should adjust accordingly
- */
- public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
- {
- long lowerTimestamp;
- long upperTimestamp;
- long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
-
- switch(windowTimeUnit)
- {
- case MINUTES:
- lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
- break;
- case HOURS:
- lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
- break;
- case DAYS:
- default:
- lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
- break;
- }
-
- return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
- TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
-
- }
-
- /**
- * Group files with similar max timestamp into buckets.
- *
- * @param files pairs consisting of a file and its min timestamp
- * @param sstableWindowUnit
- * @param sstableWindowSize
- * @param timestampResolution
- * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
- */
- @VisibleForTesting
- static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
- {
- HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
-
- long maxTimestamp = 0;
- // Create hash map to represent buckets
- // For each sstable, add sstable to the time bucket
- // Where the bucket is the file's max timestamp rounded to the nearest window bucket
- for (SSTableReader f : files)
- {
- assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
- long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
- Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
- buckets.put(bounds.left, f);
- if (bounds.left > maxTimestamp)
- maxTimestamp = bounds.left;
- }
-
- logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
- return Pair.create(buckets, maxTimestamp);
- }
-
- private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
- {
- int n = 0;
- long now = this.highestWindowSeen;
-
- for(Long key : tasks.keySet())
- {
- // For current window, make sure it's compactable
- if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
- n++;
- else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
- n++;
- }
- this.estimatedRemainingTasks = n;
- }
-
-
- /**
- * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
- * @param minThreshold minimum number of sstables in a bucket to qualify.
- * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
- * @return a bucket (list) of sstables to compact.
- */
- @VisibleForTesting
- static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
- {
- // If the current bucket has at least minThreshold SSTables, choose that one.
- // For any other bucket, at least 2 SSTables is enough.
- // In any case, limit to maxThreshold SSTables.
-
- TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
-
- Iterator<Long> it = allKeys.descendingIterator();
- while(it.hasNext())
- {
- Long key = it.next();
- Set<SSTableReader> bucket = buckets.get(key);
- logger.trace("Key {}, now {}", key, now);
- if (bucket.size() >= minThreshold && key >= now)
- {
- // If we're in the newest bucket, we'll use STCS to prioritize sstables
- List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
- List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
- logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
- List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
-
- // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
- if (!stcsInterestingBucket.isEmpty())
- return stcsInterestingBucket;
- }
- else if (bucket.size() >= 2 && key < now)
- {
- logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
- return trimToThreshold(bucket, maxThreshold);
- }
- else
- {
- logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
- }
- }
- return Collections.<SSTableReader>emptyList();
- }
-
- /**
- * @param bucket set of sstables
- * @param maxThreshold maximum number of sstables in a single compaction task.
- * @return A bucket trimmed to the maxThreshold newest sstables.
- */
- @VisibleForTesting
- static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
- {
- List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
-
- // Trim the largest sstables off the end to meet the maxThreshold
- Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
-
- return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
- }
-
- @Override
- public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
- {
- Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
- if (Iterables.isEmpty(filteredSSTables))
- return null;
- LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
- if (txn == null)
- return null;
- return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
- }
-
- @Override
- public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
- {
- assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
-
- LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
- if (modifier == null)
- {
- logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
- return null;
- }
-
- return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
- }
-
- public int getEstimatedRemainingTasks()
- {
- return this.estimatedRemainingTasks;
- }
-
- public long getMaxSSTableBytes()
- {
- return Long.MAX_VALUE;
- }
-
-
- public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
- {
- Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
- uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
-
- uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
- uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
-
- return uncheckedOptions;
- }
-
- public String toString()
- {
- return String.format("TimeWindowCompactionStrategy[%s/%s]",
- cfs.getMinimumCompactionThreshold(),
- cfs.getMaximumCompactionThreshold());
- }
-}
--
2.9.4

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 17, 2017, 1:55:43 AM7/17/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
compaction_strategy.hh | 5 +++++
sstables/time_window_compaction_strategy.hh | 2 +-
sstables/compaction_strategy.cc | 4 ++++
3 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/compaction_strategy.hh b/compaction_strategy.hh
index 8f2aef4..32be96d 100644
--- a/compaction_strategy.hh
+++ b/compaction_strategy.hh
@@ -33,6 +33,7 @@ enum class compaction_strategy_type {
size_tiered,
leveled,
date_tiered,
+ time_window,
};

class compaction_strategy_impl;
@@ -82,6 +83,8 @@ class compaction_strategy {
return "LeveledCompactionStrategy";
case compaction_strategy_type::date_tiered:
return "DateTieredCompactionStrategy";
+ case compaction_strategy_type::time_window:
+ return "TimeWindowCompactionStrategy";
default:
throw std::runtime_error("Invalid Compaction Strategy");
}
@@ -100,6 +103,8 @@ class compaction_strategy {
return compaction_strategy_type::leveled;
} else if (short_name == "DateTieredCompactionStrategy") {
return compaction_strategy_type::date_tiered;
+ } else if (short_name == "TimeWindowCompactionStrategy") {
+ return compaction_strategy_type::time_window;
} else {
throw exceptions::configuration_exception(sprint("Unable to find compaction strategy class '%s'", name));
}
diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
index f213c01..a6fe779 100644
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -251,7 +251,7 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
}

virtual compaction_strategy_type type() const {
- throw std::runtime_error("TWCS not wired yet");
+ return compaction_strategy_type::time_window;
}
};

diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc
index aa5ea5b..f396b89 100644
--- a/sstables/compaction_strategy.cc
+++ b/sstables/compaction_strategy.cc
@@ -55,6 +55,7 @@
#include "size_tiered_compaction_strategy.hh"
#include "date_tiered_compaction_strategy.hh"
#include "leveled_compaction_strategy.hh"
+#include "time_window_compaction_strategy.hh"

logging::logger date_tiered_manifest::logger = logging::logger("DateTieredCompactionStrategy");
logging::logger leveled_manifest::logger("LeveledManifest");
@@ -456,6 +457,9 @@ compaction_strategy make_compaction_strategy(compaction_strategy_type strategy,
case compaction_strategy_type::date_tiered:
impl = make_shared<date_tiered_compaction_strategy>(date_tiered_compaction_strategy(options));
break;
+ case compaction_strategy_type::time_window:
+ impl = make_shared<time_window_compaction_strategy>(time_window_compaction_strategy(options));
+ break;
default:
throw std::runtime_error("strategy not supported");
}
--
2.9.4

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 17, 2017, 1:55:45 AM7/17/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
tests/sstable_datafile_test.cc | 121 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 121 insertions(+)

diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc
index da77d35..6fca158 100644
--- a/tests/sstable_datafile_test.cc
+++ b/tests/sstable_datafile_test.cc
@@ -42,6 +42,7 @@
#include "partition_slice_builder.hh"
#include "sstables/compaction_strategy_impl.hh"
#include "sstables/date_tiered_compaction_strategy.hh"
+#include "sstables/time_window_compaction_strategy.hh"
#include "mutation_assertions.hh"
#include "mutation_reader_assertions.hh"
#include "counters.hh"
@@ -69,6 +70,8 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t ttl = 0, uint32_t expira
}
}

+static shared_sstable make_sstable_containing(std::function<shared_sstable()> sst_factory, std::vector<mutation> muts);
+
SEASTAR_TEST_CASE(datafile_generation_01) {
// Data file with clustering key
//
@@ -3105,6 +3108,124 @@ SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
return make_ready_future<>();
}

+SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
+ using namespace std::chrono;
+
+ api::timestamp_type tstamp1 = duration_cast<microseconds>(milliseconds(1451001601000L)).count(); // 2015-12-25 @ 00:00:01, in milliseconds
+ api::timestamp_type tstamp2 = duration_cast<microseconds>(milliseconds(1451088001000L)).count(); // 2015-12-26 @ 00:00:01, in milliseconds
+ api::timestamp_type low_hour = duration_cast<microseconds>(milliseconds(1451001600000L)).count(); // 2015-12-25 @ 00:00:00, in milliseconds
+
+
+ // A 1 hour window should round down to the beginning of the hour
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), tstamp1) == low_hour);
+
+ // A 1 minute window should round down to the beginning of the hour
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(minutes(1)), tstamp1) == low_hour);
+
+ // A 1 day window should round down to the beginning of the hour
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(24)), tstamp1) == low_hour);
+
+ // The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(24*2)), tstamp2) == low_hour);
+
+ return make_ready_future<>();
+}
+
+SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
+ using namespace std::chrono;
+
+ return seastar::async([] {
+ auto s = schema_builder("tests", "time_window_strategy")
+ .with_column("id", utf8_type, column_kind::partition_key)
+ .with_column("value", int32_type).build();
+
+ auto tmp = make_lw_shared<tmpdir>();
+ auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
+ return make_lw_shared<sstable>(s, tmp->path, (*gen)++, la, big);
+ };
+
+ auto make_insert = [&] (partition_key key, api::timestamp_type t) {
+ mutation m(key, s);
+ m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), t);
+ return m;
+ };
+
+ api::timestamp_type tstamp = api::timestamp_clock::now().time_since_epoch().count();
+ api::timestamp_type tstamp2 = tstamp - duration_cast<microseconds>(seconds(2L * 3600L)).count();
+
+ std::vector<shared_sstable> sstables;
+
+ // create 5 sstables
+ for (api::timestamp_type t = 0; t < 3; t++) {
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(t))});
+ auto mut = make_insert(std::move(key), t);
+ sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
+ }
+ // Decrement the timestamp to simulate a timestamp in the past hour
+ for (api::timestamp_type t = 3; t < 5; t++) {
+ // And add progressively more cells into each sstable
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(t))});
+ auto mut = make_insert(std::move(key), t);
+ sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
+ }
+
+ std::map<api::timestamp_type, std::vector<shared_sstable>> buckets;
+
+ // We'll put 3 sstables into the newest bucket
+ for (api::timestamp_type i = 0; i < 3; i++) {
+ auto bound = time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), tstamp);
+ buckets[bound].push_back(sstables[i]);
+ }
+ auto now = api::timestamp_clock::now().time_since_epoch().count();
+ auto new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
+ time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now));
+ // incoming bucket should not be accepted when it has below the min threshold SSTables
+ BOOST_REQUIRE(new_bucket.empty());
+
+ now = api::timestamp_clock::now().time_since_epoch().count();
+ new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 2, 32, duration_cast<seconds>(hours(1)),
+ time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now));
+ // incoming bucket should be accepted when it is larger than the min threshold SSTables
+ // FIXME: enable check below once twcs passes min threshold to size tiered.
+ // BOOST_REQUIRE(!new_bucket.empty());
+
+ // And 2 into the second bucket (1 hour back)
+ for (api::timestamp_type i = 3; i < 5; i++) {
+ auto bound = time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), tstamp2);
+ buckets[bound].push_back(sstables[i]);
+ }
+
+ // "an sstable with a single value should have equal min/max timestamps"
+ for (auto& sst : sstables) {
+ BOOST_REQUIRE(sst->get_stats_metadata().min_timestamp == sst->get_stats_metadata().max_timestamp);
+ }
+
+ // Test trim
+ auto num_sstables = 40;
+ for (int r = 5; r < num_sstables; r++) {
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(r))});
+ std::vector<mutation> mutations;
+ for (int i = 0 ; i < r ; i++) {
+ mutations.push_back(make_insert(key, tstamp + r));
+ }
+ sstables.push_back(make_sstable_containing(sst_gen, std::move(mutations)));
+ }
+
+ // Reset the buckets, overfill it now
+ for (int i = 0 ; i < 40; i++) {
+ auto bound = time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)),
+ sstables[i]->get_stats_metadata().max_timestamp);
+ buckets[bound].push_back(sstables[i]);
+ }
+
+ now = api::timestamp_clock::now().time_since_epoch().count();
+ new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
+ time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now));
+ // new bucket should be trimmed to max threshold of 32
+ BOOST_REQUIRE(new_bucket.size() == size_t(32));
+ });
+}
+
SEASTAR_TEST_CASE(test_promoted_index_read) {
// create table promoted_index_read (
// pk int,
--
2.9.4

Avi Kivity

<avi@scylladb.com>
unread,
Jul 17, 2017, 3:16:31 AM7/17/17
to Raphael S. Carvalho, scylladb-dev@googlegroups.com
Nicer: window_size_days * 86400s.

> + static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
> + static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return std::chrono::seconds(60*10); }

600s.
get_fully_expired_sstables() should be changed (later) to accept a
gc_clock::time_point.

> + _last_expired_check = db_clock::now();
> + } else {
> + clogger.debug("TWCS skipping check for fully expired SSTables");
> + }
> +
> + std::vector<shared_sstable> non_expired;
> +
> + if (!expired.empty()) {
> + auto cmp = [] (const shared_sstable& x, const shared_sstable& y) {
> + return x->generation() < y->generation();
> + };
> + boost::range::sort(candidates, cmp);
> + boost::range::sort(expired, cmp);
> + boost::set_difference(candidates, expired, std::back_inserter(non_expired), cmp);

Faster:

auto expired_as_set =
boost::copy_range<std::unordered_set<shared_sstable>>(expired);
auto is_expired = [&] (const shared_sstable& s) { return
expired_as_set.find(s) != expired_as_set.end(); };
candidates.erase(boost::remove_if(candidates, is_expired),
candidates.end());

Since it's O(n), not O(n log n).

> + } else {
> + non_expired = std::move(candidates);
> + }
> +
> + auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(non_expired), gc_before);
> + if (!expired.empty()) {
> + compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
> + }
> + return compaction_candidates;
> + }
> +private:
> + std::vector<shared_sstable>
> + get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
> + auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
> +
> + if (!most_interesting.empty()) {
> + return most_interesting;
> + }
> +
> + // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
> + // ratio is greater than threshold.
> + auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
> + return !worth_dropping_tombstones(sst, gc_before);
> + });

For later: here, we prioritize different types of compaction, but there
may be a high priority compaction for another table that is being
preempted by a low priority compaction for this table.

A compaction_strategy should estimate the importance of a compaction
job, and the compaction manager should pick the most important one (a
compaction that reduces read amplification on a heavily read table vs. a
compaction that drops some tombstones). The amount of free space would
also figure in the selection.
for (auto&& key_bucket : boost::adaptors::reversed) {

Avi Kivity

<avi@scylladb.com>
unread,
Jul 17, 2017, 3:17:34 AM7/17/17
to Raphael S. Carvalho, scylladb-dev@googlegroups.com


On 07/17/2017 08:55 AM, Raphael S. Carvalho wrote:
> Time window strategy was introduced to address several limitations of
> date tiered strategy. In addition, its options are much easier to reason
> about, basically just window size and window unit.
> TWCS will work to keep only one sstable in each window. So the only real
> optimization needed is to align partition key to the window.
> Size tiered strategy is used to reduce write amplification when compacting
> the incoming window.
>
> For more details: https://issues.apache.org/jira/browse/CASSANDRA-9666
> Fixes #1432.

Looks good, I had just minor comments.

> TODO: override default values with options provided in schema

Please complete this and we can merge.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 19, 2017, 2:15:19 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Time window strategy was introduced to address several limitations of
date tiered strategy. In addition, its options are much easier to reason
about, basically just window size and window unit.
TWCS will work to keep only one sstable in each window. So the only real
optimization needed is to align partition key to the window.
Size tiered strategy is used to reduce write amplification when compacting
the incoming window.

For more details: https://issues.apache.org/jira/browse/CASSANDRA-9666
Fixes #1432.

also at: g...@github.com:raphaelsc/scylla.git twcs_v2

v2:
use chrono literals when possible
optimize code to filter out expired tables
override default values with options specified in schema

Raphael S. Carvalho (5):
sstables: import TimeWindowCompactionStrategy.java
sstables: implement time window compaction strategy
compaction/twcs: override default values with options in schema
compaction: wire up time window compaction strategy
tests: add tests for time window compaction strategy

compaction_strategy.hh | 5 +
sstables/time_window_compaction_strategy.hh | 302 ++++++++++++++++++++++++++++
sstables/compaction_strategy.cc | 4 +
tests/sstable_datafile_test.cc | 121 +++++++++++
4 files changed, 432 insertions(+)
create mode 100644 sstables/time_window_compaction_strategy.hh

--
2.9.4

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 19, 2017, 2:15:21 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
it will be later converted to C++. Imported from latest scylla-
tools-java repository. Checked that it doesn't lack anything.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/TimeWindowCompactionStrategy.java | 380 +++++++++++++++++++++++++++++
1 file changed, 380 insertions(+)
create mode 100644 sstables/TimeWindowCompactionStrategy.java

diff --git a/sstables/TimeWindowCompactionStrategy.java b/sstables/TimeWindowCompactionStrategy.java
new file mode 100644
index 0000000..e2ab7dc
--- /dev/null
+++ b/sstables/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
+
+ // Find fully expired SSTables. Those will be included no matter what.
+ }
+
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+ for (SSTableReader sstable : nonExpiringSSTables)
+ {
+ if (worthDroppingTombstones(sstable, gcBefore))
+ sstablesWithTombstones.add(sstable);
+ }
+ if (sstablesWithTombstones.isEmpty())
+ return Collections.emptyList();
+
+ return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+ }
+
+ private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+ {
+ Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
+ // Update the highest window seen, if necessary
+ static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
+ {
+ HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+ long maxTimestamp = 0;
+ // Create hash map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+ for (SSTableReader f : files)
+ {
+ assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+ long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+ buckets.put(bounds.left, f);
+ if (bounds.left > maxTimestamp)
+ maxTimestamp = bounds.left;
+ }
+
+ logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+ return Pair.create(buckets, maxTimestamp);
+ }
+
+ private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+ {
+ int n = 0;
+ long now = this.highestWindowSeen;
+
+ for(Long key : tasks.keySet())
+ {
+ // For current window, make sure it's compactable
+ if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
+ n++;
+ else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+ n++;
+ }
+ this.estimatedRemainingTasks = n;
+ }
+
+
+ /**
+ * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
+ * @param minThreshold minimum number of sstables in a bucket to qualify.
+ * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
+ * @return a bucket (list) of sstables to compact.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+ {
+ // If the current bucket has at least minThreshold SSTables, choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+ Iterator<Long> it = allKeys.descendingIterator();
+ while(it.hasNext())
+ {
+ Long key = it.next();
+ Set<SSTableReader> bucket = buckets.get(key);
+ logger.trace("Key {}, now {}", key, now);
+ if (bucket.size() >= minThreshold && key >= now)
+ {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+ List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+ logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+ List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcsInterestingBucket.isEmpty())
+ return stcsInterestingBucket;
+ }
+ else if (bucket.size() >= 2 && key < now)
+ {
+ logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+ return trimToThreshold(bucket, maxThreshold);
+ }
+ else
+ {
+ logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return Collections.<SSTableReader>emptyList();
+ }
+
+ /**
+ * @param bucket set of sstables
+ * @param maxThreshold maximum number of sstables in a single compaction task.
+ * @return A bucket trimmed to the maxThreshold newest sstables.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
+ {
+ List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+ // Trim the largest sstables off the end to meet the maxThreshold
+ Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+
+ return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
+ }
+
+ @Override
+ public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
+ {
+ Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
+ if (Iterables.isEmpty(filteredSSTables))
+ return null;
+ LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+ if (txn == null)
+ return null;
+ return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+ }
+
+ @Override
+ public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (modifier == null)
+ {
+ logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
+ return null;
+ }
+

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 19, 2017, 2:15:24 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/time_window_compaction_strategy.hh | 254 +++++++++++++++++++
sstables/TimeWindowCompactionStrategy.java | 380 ----------------------------
2 files changed, 254 insertions(+), 380 deletions(-)
create mode 100644 sstables/time_window_compaction_strategy.hh
delete mode 100644 sstables/TimeWindowCompactionStrategy.java

diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
new file mode 100644
index 0000000..8ced34c
--- /dev/null
+++ b/sstables/time_window_compaction_strategy.hh
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/range/adaptors.hpp>
+
+namespace sstables {
+
+extern logging::logger clogger;
+
+using namespace std::chrono_literals;
+
+struct time_window_compaction_strategy_options {
+ static constexpr std::chrono::seconds DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return window_size * 86400s; }
+ static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+ static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return 600s; }
+
+ std::chrono::seconds sstable_window_size = DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
+ db_clock::duration expired_sstable_check_frequency = DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
+
+ // FIXME: override default values with the ones in schema.
+};
+
+using timestamp_type = api::timestamp_type;
+
+class time_window_compaction_strategy : public compaction_strategy_impl {
+ time_window_compaction_strategy_options _options;
+ int64_t _estimated_remaining_tasks = 0;
+ db_clock::time_point _last_expired_check;
+ timestamp_type _highest_window_seen;
+public:
+ time_window_compaction_strategy(const std::map<sstring, sstring>& options)
+ : compaction_strategy_impl(options)
+ {
+ if (!options.count(TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.count(TOMBSTONE_THRESHOLD_OPTION)) {
+ _disable_tombstone_compaction = true;
+ clogger.debug("Disabling tombstone compactions for TWCS");
+ } else {
+ clogger.debug("Enabling tombstone compactions for TWCS");
+ }
+ _use_clustering_key_filter = true;
+ }
+
+ virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override {
+ auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
+
+ if (candidates.empty()) {
+ return compaction_descriptor();
+ }
+
+ // Find fully expired SSTables. Those will be included no matter what.
+ std::vector<shared_sstable> expired;
+
+ if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
+ clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+ expired = get_fully_expired_sstables(cf, candidates, gc_before.time_since_epoch().count());
+ _last_expired_check = db_clock::now();
+ } else {
+ clogger.debug("TWCS skipping check for fully expired SSTables");
+ }
+
+ if (!expired.empty()) {
+ auto expired_as_set = boost::copy_range<std::unordered_set<shared_sstable>>(expired);
+ auto is_expired = [&] (const shared_sstable& s) { return expired_as_set.find(s) != expired_as_set.end(); };
+ candidates.erase(boost::remove_if(candidates, is_expired), candidates.end());
+ }
+
+ auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before);
+ if (!expired.empty()) {
+ compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
+ }
+ return compaction_candidates;
+ }
+private:
+ std::vector<shared_sstable>
+ get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
+ auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
+
+ if (!most_interesting.empty()) {
+ return most_interesting;
+ }
+
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
+ });
+ non_expiring_sstables.erase(e, non_expiring_sstables.end());
+ if (non_expiring_sstables.empty()) {
+ return {};
+ }
+ auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
+ return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
+ });
+ return { *it };
+ }
+
+ std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
+ auto p = get_buckets(std::move(candidate_sstables), _options.sstable_window_size);
+ // Update the highest window seen, if necessary
+ timestamp_type max_timestamp = 0;
+ // Create map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+ for (auto&& f : files) {
+ timestamp_type ts = f->get_stats_metadata().max_timestamp;
+ timestamp_type lower_bound = get_window_lower_bound(sstable_window_size, ts);
+ buckets[lower_bound].push_back(std::move(f));
+ max_timestamp = std::max(max_timestamp, lower_bound);
+ }
+
+ return std::make_pair(std::move(buckets), max_timestamp);
+ }
+
+ static std::vector<shared_sstable>
+ newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets, int min_threshold, int max_threshold,
+ std::chrono::seconds sstable_window_size, timestamp_type now) {
+ // If the current bucket has at least minThreshold SSTables, choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
+ auto key = key_bucket.first;
+ auto& bucket = key_bucket.second;
+
+ clogger.trace("Key {}, now {}", key, now);
+
+ if (bucket.size() >= size_t(min_threshold) && key >= now) {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ auto stcs_interesting_bucket = size_tiered_most_interesting_bucket(bucket);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcs_interesting_bucket.empty()) {
+ return stcs_interesting_bucket;
+ }
+ } else if (bucket.size() >= 2 && key < now) {
+ clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
+ return trim_to_threshold(std::move(bucket), max_threshold);
+ } else {
+ clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return {};
+ }
+
+ static std::vector<shared_sstable>
+ trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
+ auto n = std::min(bucket.size(), size_t(max_threshold));
+ // Trim the largest sstables off the end to meet the maxThreshold
+ boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
+ return i->ondisk_data_size() < j->ondisk_data_size();
+ });
+ bucket.resize(n);
+ return bucket;
+ }
+private:
+ void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold) {
+ int64_t n = 0;
+ timestamp_type now = _highest_window_seen;
+
+ for (auto task : tasks) {
+ auto key = task.first;
+
+ // For current window, make sure it's compactable
+ auto count = task.second.size();
+ if (key >= now && count >= size_t(min_threshold)) {
+ n++;
+ } else if (key < now && count >= 2) {
+ n++;
+ }
+ }
+ _estimated_remaining_tasks = n;
+ }
+public:
+ virtual int64_t estimated_pending_compactions(column_family& cf) const override {
+ return _estimated_remaining_tasks;
+ }
+
+ virtual compaction_strategy_type type() const {
+ throw std::runtime_error("TWCS not wired yet");
+ }
+};
+
+}
- * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
- * @param minThreshold minimum number of sstables in a bucket to qualify.
- * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
- * @return a bucket (list) of sstables to compact.
- */
- @VisibleForTesting
- * @param bucket set of sstables
- * @param maxThreshold maximum number of sstables in a single compaction task.
- * @return A bucket trimmed to the maxThreshold newest sstables.
- */
- @VisibleForTesting
- static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
- {
- List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
-
- // Trim the largest sstables off the end to meet the maxThreshold
- Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
-
- return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
- }
-
- @Override
- public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
- {
- Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
- if (Iterables.isEmpty(filteredSSTables))
- return null;
- LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
- if (txn == null)
- return null;
- return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
- }
-
- @Override
- public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
- {
- assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
-
- LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
- if (modifier == null)
- {
- logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
- return null;
- }
-
--
2.9.4

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 19, 2017, 2:15:25 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
sstables/time_window_compaction_strategy.hh | 54 +++++++++++++++++++++++++++--
1 file changed, 51 insertions(+), 3 deletions(-)

diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
index 8ced34c..152b449 100644
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -44,6 +44,7 @@
#include "compaction_strategy_impl.hh"
#include "compaction.hh"
#include "timestamp.hh"
+#include "exceptions/exceptions.hh"
#include <boost/range/algorithm/partial_sort.hpp>
#include <boost/range/adaptors.hpp>

@@ -53,15 +54,62 @@ extern logging::logger clogger;

using namespace std::chrono_literals;

-struct time_window_compaction_strategy_options {
+class time_window_compaction_strategy_options {
+private:
static constexpr std::chrono::seconds DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return window_size * 86400s; }
static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return 600s; }

+ static constexpr auto TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
+ static constexpr auto COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
+ static constexpr auto COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
+ static constexpr auto EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
+
+ const std::unordered_map<sstring, std::chrono::seconds> valid_window_units = { { "MINUTES", 60s }, { "HOURS", 3600s }, { "DAYS", 86400s } };
+ // TODO: add support to timestamp resolution other than microseconds, but it's not that important
+ // because new clients only use this one.
+ const std::unordered_set<sstring> valid_timestamp_resolutions = { "MICROSECONDS" };
+
std::chrono::seconds sstable_window_size = DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
db_clock::duration expired_sstable_check_frequency = DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
+public:
+ time_window_compaction_strategy_options(const std::map<sstring, sstring>& options) {
+ std::chrono::seconds window_unit;
+
+ auto it = options.find(COMPACTION_WINDOW_UNIT_KEY);
+ if (it != options.end()) {
+ auto valid_window_units_it = valid_window_units.find(it->second);
+ if (valid_window_units_it == valid_window_units.end()) {
+ throw exceptions::syntax_exception(sstring("Invalid window unit ") + it->second + " for " + COMPACTION_WINDOW_UNIT_KEY);
+ }
+ window_unit = valid_window_units_it->second;
+ }
+
+ it = options.find(COMPACTION_WINDOW_SIZE_KEY);
+ if (it != options.end()) {
+ try {
+ sstable_window_size = std::stoi(it->second) * window_unit;
+ } catch (const std::exception& e) {
+ throw exceptions::syntax_exception(sstring("Invalid integer value ") + it->second + " for " + COMPACTION_WINDOW_SIZE_KEY);
+ }
+ }
+
+ it = options.find(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+ if (it != options.end()) {
+ try {
+ expired_sstable_check_frequency = std::chrono::seconds(std::stol(it->second));
+ } catch (const std::exception& e) {
+ throw exceptions::syntax_exception(sstring("Invalid long value ") + it->second + "for " + EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+ }
+ }
+
+ it = options.find(TIMESTAMP_RESOLUTION_KEY);
+ if (it != options.end() && !valid_timestamp_resolutions.count(it->second)) {
+ throw exceptions::syntax_exception(sstring("Invalid timestamp resolution ") + it->second + "for " + TIMESTAMP_RESOLUTION_KEY);
+ }
+ }

- // FIXME: override default values with the ones in schema.
+ friend class time_window_compaction_strategy;
};

using timestamp_type = api::timestamp_type;
@@ -73,7 +121,7 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
timestamp_type _highest_window_seen;
public:
time_window_compaction_strategy(const std::map<sstring, sstring>& options)
- : compaction_strategy_impl(options)
+ : compaction_strategy_impl(options), _options(options)
{
if (!options.count(TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.count(TOMBSTONE_THRESHOLD_OPTION)) {
_disable_tombstone_compaction = true;
--
2.9.4

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 19, 2017, 2:15:27 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
index 152b449..8712663 100644
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -295,7 +295,7 @@ class time_window_compaction_strategy : public compaction_strategy_impl {

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 19, 2017, 2:15:29 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>
---
tests/sstable_datafile_test.cc | 121 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 121 insertions(+)

diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc
index 507ac44..816a539 100644
--- a/tests/sstable_datafile_test.cc
+++ b/tests/sstable_datafile_test.cc
@@ -42,6 +42,7 @@
#include "partition_slice_builder.hh"
#include "sstables/compaction_strategy_impl.hh"
#include "sstables/date_tiered_compaction_strategy.hh"
+#include "sstables/time_window_compaction_strategy.hh"
#include "mutation_assertions.hh"
#include "mutation_reader_assertions.hh"
#include "counters.hh"
@@ -69,6 +70,8 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t ttl = 0, uint32_t expira
}
}

+static shared_sstable make_sstable_containing(std::function<shared_sstable()> sst_factory, std::vector<mutation> muts);
+
SEASTAR_TEST_CASE(datafile_generation_01) {
// Data file with clustering key
//
@@ -3105,6 +3108,124 @@ SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
return make_ready_future<>();
}

+SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
+ using namespace std::chrono;
+
+ api::timestamp_type tstamp1 = duration_cast<microseconds>(milliseconds(1451001601000L)).count(); // 2015-12-25 @ 00:00:01, in milliseconds
+ api::timestamp_type tstamp2 = duration_cast<microseconds>(milliseconds(1451088001000L)).count(); // 2015-12-26 @ 00:00:01, in milliseconds
+ api::timestamp_type low_hour = duration_cast<microseconds>(milliseconds(1451001600000L)).count(); // 2015-12-25 @ 00:00:00, in milliseconds
+
+
+ // A 1 hour window should round down to the beginning of the hour
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), tstamp1) == low_hour);
+
+ // A 1 minute window should round down to the beginning of the hour
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(minutes(1)), tstamp1) == low_hour);
+
+ // A 1 day window should round down to the beginning of the hour
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(24)), tstamp1) == low_hour);
+
+ // The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25
+ BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(24*2)), tstamp2) == low_hour);
+
+ return make_ready_future<>();
+}
+
+SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
+ using namespace std::chrono;
+
+ return seastar::async([] {
+ auto s = schema_builder("tests", "time_window_strategy")
+ .with_column("id", utf8_type, column_kind::partition_key)
+ .with_column("value", int32_type).build();
+
+ auto tmp = make_lw_shared<tmpdir>();
+ auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
+ return make_lw_shared<sstable>(s, tmp->path, (*gen)++, la, big);
+ };
+
+ auto make_insert = [&] (partition_key key, api::timestamp_type t) {
+ mutation m(key, s);
+ m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), t);
+ return m;
+ };
+
+ api::timestamp_type tstamp = api::timestamp_clock::now().time_since_epoch().count();
+ api::timestamp_type tstamp2 = tstamp - duration_cast<microseconds>(seconds(2L * 3600L)).count();
+
+ std::vector<shared_sstable> sstables;
+
+ // create 5 sstables
+ for (api::timestamp_type t = 0; t < 3; t++) {
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(t))});
+ auto mut = make_insert(std::move(key), t);
+ sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
+ }
+ // Decrement the timestamp to simulate a timestamp in the past hour
+ for (api::timestamp_type t = 3; t < 5; t++) {
+ // And add progressively more cells into each sstable
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(t))});
+ auto mut = make_insert(std::move(key), t);
+ sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
+ }
+
+ std::map<api::timestamp_type, std::vector<shared_sstable>> buckets;
+

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 19, 2017, 3:23:32 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

sstables: import TimeWindowCompactionStrategy.java

it will be later converted to C++. Imported from latest scylla-
tools-java repository. Checked that it doesn't lack anything.

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/sstables/TimeWindowCompactionStrategy.java
b/sstables/TimeWindowCompactionStrategy.java
--- a/sstables/TimeWindowCompactionStrategy.java
+++ b/sstables/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ private static final Logger logger =
LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
+
+ private final TimeWindowCompactionStrategyOptions options;
+ protected volatile int estimatedRemainingTasks;
+ private final Set<SSTableReader> sstables = new HashSet<>();
+ private long lastExpiredCheck;
+ private long highestWindowSeen;
+
+ public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String,
String> options)
+ {
+ super(cfs, options);
+ this.estimatedRemainingTasks = 0;
+ this.options = new TimeWindowCompactionStrategyOptions(options);
+ if
(!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION)
&& !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
+ {
+ disableTombstoneCompactions = true;
+ logger.debug("Disabling tombstone compactions for TWCS");
+ }
+ else
+ logger.debug("Enabling tombstone compactions for TWCS");
+
+ }
+
+ @Override
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ {
+ while (true)
+ {
+ List<SSTableReader> latestBucket =
getNextBackgroundSSTables(gcBefore);
+
+ if (latestBucket.isEmpty())
+ return null;
+
+ LifecycleTransaction modifier =
cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+ if (modifier != null)
+ return new CompactionTask(cfs, modifier, gcBefore);
+ }
+ }
+
+ /**
+ *
+ * @param gcBefore
+ * @return
+ */
+ private synchronized List<SSTableReader>
getNextBackgroundSSTables(final int gcBefore)
+ {
+ if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+ return Collections.emptyList();
+
+ Set<SSTableReader> uncompacting =
ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(),
sstables::contains));
+
+ // Find fully expired SSTables. Those will be included no matter
what.
+ Set<SSTableReader> expired = Collections.emptySet();
+
+ if (System.currentTimeMillis() - lastExpiredCheck >
options.expiredSSTableCheckFrequency)
+ {
+ logger.debug("TWCS expired check sufficiently far in the past,
checking for fully expired SSTables");
+ expired = CompactionController.getFullyExpiredSSTables(cfs,
uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
+ lastExpiredCheck = System.currentTimeMillis();
+ }
+ else
+ {
+ logger.debug("TWCS skipping check for fully expired SSTables");
+ }
+
+ Set<SSTableReader> candidates =
Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+ List<SSTableReader> compactionCandidates = new
ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired),
gcBefore));
+ if (!expired.isEmpty())
+ {
+ logger.debug("Including expired sstables: {}", expired);
+ compactionCandidates.addAll(expired);
+ }
+
+ return compactionCandidates;
+ }
+
+ private List<SSTableReader>
getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables,
final int gcBefore)
+ {
+ List<SSTableReader> mostInteresting =
getCompactionCandidates(nonExpiringSSTables);
+
+ if (mostInteresting != null)
+ {
+ return mostInteresting;
+ }
+
+ // if there is no sstable to compact in standard way, try
compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+ for (SSTableReader sstable : nonExpiringSSTables)
+ {
+ if (worthDroppingTombstones(sstable, gcBefore))
+ sstablesWithTombstones.add(sstable);
+ }
+ if (sstablesWithTombstones.isEmpty())
+ return Collections.emptyList();
+
+ return
Collections.singletonList(Collections.min(sstablesWithTombstones, new
SSTableReader.SizeComparator()));
+ }
+
+ private List<SSTableReader>
getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+ {
+ Pair<HashMultimap<Long, SSTableReader>, Long> buckets =
getBuckets(candidateSSTables, options.sstableWindowUnit,
options.sstableWindowSize, options.timestampResolution);
+ // Update the highest window seen, if necessary
+ * Find the lowest and highest timestamps in a given timestamp/unit
pair
+ * Returns milliseconds, caller should adjust accordingly
+ */
+ public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit
windowTimeUnit, int windowTimeSize, long timestampInMillis)
+ {
+ long lowerTimestamp;
+ long upperTimestamp;
+ long timestampInSeconds =
TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
+
+ switch(windowTimeUnit)
+ {
+ case MINUTES:
+ lowerTimestamp = timestampInSeconds -
((timestampInSeconds) % (60 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize
- 1L))) + 59L;
+ break;
+ case HOURS:
+ lowerTimestamp = timestampInSeconds -
((timestampInSeconds) % (3600 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (3600L *
(windowTimeSize - 1L))) + 3599L;
+ break;
+ case DAYS:
+ default:
+ lowerTimestamp = timestampInSeconds -
((timestampInSeconds) % (86400 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (86400L *
(windowTimeSize - 1L))) + 86399L;
+ break;
+ }
+
+ return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp,
TimeUnit.SECONDS),
+ TimeUnit.MILLISECONDS.convert(upperTimestamp,
TimeUnit.SECONDS));
+
+ }
+
+ /**
+ * Group files with similar max timestamp into buckets.
+ *
+ * @param files pairs consisting of a file and its min timestamp
+ * @param sstableWindowUnit
+ * @param sstableWindowSize
+ * @param timestampResolution
+ * @return A pair, where the left element is the bucket representation
(map of timestamp to sstablereader), and the right is the highest timestamp
seen
+ */
+ @VisibleForTesting
+ static Pair<HashMultimap<Long, SSTableReader>, Long>
getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int
sstableWindowSize, TimeUnit timestampResolution)
+ {
+ HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+ long maxTimestamp = 0;
+ // Create hash map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the
nearest window bucket
+ for (SSTableReader f : files)
+ {
+ assert
TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+ long tStamp =
TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
+ Pair<Long,Long> bounds =
getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+ buckets.put(bounds.left, f);
+ if (bounds.left > maxTimestamp)
+ maxTimestamp = bounds.left;
+ }
+
+ logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+ return Pair.create(buckets, maxTimestamp);
+ }
+
+ private void updateEstimatedCompactionsByTasks(HashMultimap<Long,
SSTableReader> tasks)
+ {
+ int n = 0;
+ long now = this.highestWindowSeen;
+
+ for(Long key : tasks.keySet())
+ {
+ // For current window, make sure it's compactable
+ if (key.compareTo(now) >= 0 && tasks.get(key).size() >=
cfs.getMinimumCompactionThreshold())
+ n++;
+ else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+ n++;
+ }
+ this.estimatedRemainingTasks = n;
+ }
+
+
+ /**
+ * @param buckets list of buckets, sorted from newest to oldest, from
which to return the newest bucket within thresholds.
+ * @param minThreshold minimum number of sstables in a bucket to
qualify.
+ * @param maxThreshold maximum number of sstables to compact at once
(the returned bucket will be trimmed down to this).
+ * @return a bucket (list) of sstables to compact.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> newestBucket(HashMultimap<Long,
SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit
sstableWindowUnit, int sstableWindowSize,
SizeTieredCompactionStrategyOptions stcsOptions, long now)
+ {
+ // If the current bucket has at least minThreshold SSTables,
choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+ Iterator<Long> it = allKeys.descendingIterator();
+ while(it.hasNext())
+ {
+ Long key = it.next();
+ Set<SSTableReader> bucket = buckets.get(key);
+ logger.trace("Key {}, now {}", key, now);
+ if (bucket.size() >= minThreshold && key >= now)
+ {
+ // If we're in the newest bucket, we'll use STCS to
prioritize sstables
+ List<Pair<SSTableReader,Long>> pairs =
SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+ List<List<SSTableReader>> stcsBuckets =
SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh,
stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+ logger.debug("Using STCS compaction for first window of
bucket: data files {} , options {}", pairs, stcsOptions);
+ List<SSTableReader> stcsInterestingBucket =
SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets,
minThreshold, maxThreshold);
+
+ // If the tables in the current bucket aren't eligible in
the STCS strategy, we'll skip it and look for other buckets
+ if (!stcsInterestingBucket.isEmpty())
+ return stcsInterestingBucket;
+ }
+ else if (bucket.size() >= 2 && key < now)
+ {
+ logger.debug("bucket size {} >= 2 and not in current
bucket, compacting what's here: {}", bucket.size(), bucket);
+ return trimToThreshold(bucket, maxThreshold);
+ }
+ else
+ {
+ logger.debug("No compaction necessary for bucket size {} ,
key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return Collections.<SSTableReader>emptyList();
+ }
+
+ /**
+ * @param bucket set of sstables
+ * @param maxThreshold maximum number of sstables in a single
compaction task.
+ * @return A bucket trimmed to the maxThreshold newest sstables.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket,
int maxThreshold)
+ {
+ List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+ // Trim the largest sstables off the end to meet the maxThreshold
+ Collections.sort(ssTableReaders, new
SSTableReader.SizeComparator());
+
+ return ImmutableList.copyOf(Iterables.limit(ssTableReaders,
maxThreshold));
+ }
+
+ @Override
+ public synchronized Collection<AbstractCompactionTask>
getMaximalTask(int gcBefore, boolean splitOutput)
+ {
+ Iterable<SSTableReader> filteredSSTables =
filterSuspectSSTables(sstables);
+ if (Iterables.isEmpty(filteredSSTables))
+ return null;
+ LifecycleTransaction txn =
cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+ if (txn == null)
+ return null;
+ return Collections.singleton(new CompactionTask(cfs, txn,
gcBefore));
+ }
+
+ @Override
+ public synchronized AbstractCompactionTask
getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+ LifecycleTransaction modifier =
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (modifier == null)
+ {
+ logger.debug("Unable to mark {} for compaction; probably a
background compaction got to it first. You can disable background
compactions temporarily if this is a problem", sstables);
+ return null;
+ }
+
+ return new CompactionTask(cfs, modifier,
gcBefore).setUserDefined(true);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ return this.estimatedRemainingTasks;
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return Long.MAX_VALUE;
+ }
+
+
+ public static Map<String, String> validateOptions(Map<String, String>
options) throws ConfigurationException
+ {
+ Map<String, String> uncheckedOptions =
AbstractCompactionStrategy.validateOptions(options);

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 19, 2017, 3:23:33 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

sstables: implement time window compaction strategy

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/sstables/TimeWindowCompactionStrategy.java
b/sstables/TimeWindowCompactionStrategy.java
--- a/sstables/TimeWindowCompactionStrategy.java
+++ b/sstables/TimeWindowCompactionStrategy.java
- private static final Logger logger =
LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
-
- private final TimeWindowCompactionStrategyOptions options;
- protected volatile int estimatedRemainingTasks;
- private final Set<SSTableReader> sstables = new HashSet<>();
- private long lastExpiredCheck;
- private long highestWindowSeen;
-
- public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String,
String> options)
- {
- super(cfs, options);
- this.estimatedRemainingTasks = 0;
- this.options = new TimeWindowCompactionStrategyOptions(options);
- if
(!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION)
&& !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
- {
- disableTombstoneCompactions = true;
- logger.debug("Disabling tombstone compactions for TWCS");
- }
- else
- logger.debug("Enabling tombstone compactions for TWCS");
-
- }
-
- @Override
- public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
- {
- while (true)
- {
- List<SSTableReader> latestBucket =
getNextBackgroundSSTables(gcBefore);
-
- if (latestBucket.isEmpty())
- return null;
-
- LifecycleTransaction modifier =
cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
- if (modifier != null)
- return new CompactionTask(cfs, modifier, gcBefore);
- }
- }
-
- /**
- *
- * @param gcBefore
- * @return
- */
- private synchronized List<SSTableReader>
getNextBackgroundSSTables(final int gcBefore)
- {
- if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
- return Collections.emptyList();
-
- Set<SSTableReader> uncompacting =
ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(),
sstables::contains));
-
- // Find fully expired SSTables. Those will be included no matter
what.
- Set<SSTableReader> expired = Collections.emptySet();
-
- if (System.currentTimeMillis() - lastExpiredCheck >
options.expiredSSTableCheckFrequency)
- {
- logger.debug("TWCS expired check sufficiently far in the past,
checking for fully expired SSTables");
- expired = CompactionController.getFullyExpiredSSTables(cfs,
uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
- lastExpiredCheck = System.currentTimeMillis();
- }
- else
- {
- logger.debug("TWCS skipping check for fully expired SSTables");
- }
-
- Set<SSTableReader> candidates =
Sets.newHashSet(filterSuspectSSTables(uncompacting));
-
- List<SSTableReader> compactionCandidates = new
ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired),
gcBefore));
- if (!expired.isEmpty())
- {
- logger.debug("Including expired sstables: {}", expired);
- compactionCandidates.addAll(expired);
- }
-
- return compactionCandidates;
- }
-
- private List<SSTableReader>
getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables,
final int gcBefore)
- {
- List<SSTableReader> mostInteresting =
getCompactionCandidates(nonExpiringSSTables);
-
- if (mostInteresting != null)
- {
- return mostInteresting;
- }
-
- // if there is no sstable to compact in standard way, try
compacting single sstable whose droppable tombstone
- // ratio is greater than threshold.
- List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
- for (SSTableReader sstable : nonExpiringSSTables)
- {
- if (worthDroppingTombstones(sstable, gcBefore))
- sstablesWithTombstones.add(sstable);
- }
- if (sstablesWithTombstones.isEmpty())
- return Collections.emptyList();
-
- return
Collections.singletonList(Collections.min(sstablesWithTombstones, new
SSTableReader.SizeComparator()));
- }
-
- private List<SSTableReader>
getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
- {
- Pair<HashMultimap<Long, SSTableReader>, Long> buckets =
getBuckets(candidateSSTables, options.sstableWindowUnit,
options.sstableWindowSize, options.timestampResolution);
- * Find the lowest and highest timestamps in a given timestamp/unit
pair
- * Returns milliseconds, caller should adjust accordingly
- */
- public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit
windowTimeUnit, int windowTimeSize, long timestampInMillis)
- {
- long lowerTimestamp;
- long upperTimestamp;
- long timestampInSeconds =
TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
-
- switch(windowTimeUnit)
- {
- case MINUTES:
- lowerTimestamp = timestampInSeconds -
((timestampInSeconds) % (60 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize
- 1L))) + 59L;
- break;
- case HOURS:
- lowerTimestamp = timestampInSeconds -
((timestampInSeconds) % (3600 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (3600L *
(windowTimeSize - 1L))) + 3599L;
- break;
- case DAYS:
- default:
- lowerTimestamp = timestampInSeconds -
((timestampInSeconds) % (86400 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (86400L *
(windowTimeSize - 1L))) + 86399L;
- break;
- }
-
- return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp,
TimeUnit.SECONDS),
- TimeUnit.MILLISECONDS.convert(upperTimestamp,
TimeUnit.SECONDS));
-
- }
-
- /**
- * Group files with similar max timestamp into buckets.
- *
- * @param files pairs consisting of a file and its min timestamp
- * @param sstableWindowUnit
- * @param sstableWindowSize
- * @param timestampResolution
- * @return A pair, where the left element is the bucket representation
(map of timestamp to sstablereader), and the right is the highest timestamp
seen
- */
- @VisibleForTesting
- static Pair<HashMultimap<Long, SSTableReader>, Long>
getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int
sstableWindowSize, TimeUnit timestampResolution)
- {
- HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
-
- long maxTimestamp = 0;
- // Create hash map to represent buckets
- // For each sstable, add sstable to the time bucket
- // Where the bucket is the file's max timestamp rounded to the
nearest window bucket
- for (SSTableReader f : files)
- {
- assert
TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
- long tStamp =
TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
- Pair<Long,Long> bounds =
getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
- buckets.put(bounds.left, f);
- if (bounds.left > maxTimestamp)
- maxTimestamp = bounds.left;
- }
-
- logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
- return Pair.create(buckets, maxTimestamp);
- }
-
- private void updateEstimatedCompactionsByTasks(HashMultimap<Long,
SSTableReader> tasks)
- {
- int n = 0;
- long now = this.highestWindowSeen;
-
- for(Long key : tasks.keySet())
- {
- // For current window, make sure it's compactable
- if (key.compareTo(now) >= 0 && tasks.get(key).size() >=
cfs.getMinimumCompactionThreshold())
- n++;
- else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
- n++;
- }
- this.estimatedRemainingTasks = n;
- }
-
-
- /**
- * @param buckets list of buckets, sorted from newest to oldest, from
which to return the newest bucket within thresholds.
- * @param minThreshold minimum number of sstables in a bucket to
qualify.
- * @param maxThreshold maximum number of sstables to compact at once
(the returned bucket will be trimmed down to this).
- * @return a bucket (list) of sstables to compact.
- */
- @VisibleForTesting
- static List<SSTableReader> newestBucket(HashMultimap<Long,
SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit
sstableWindowUnit, int sstableWindowSize,
SizeTieredCompactionStrategyOptions stcsOptions, long now)
- {
- // If the current bucket has at least minThreshold SSTables,
choose that one.
- // For any other bucket, at least 2 SSTables is enough.
- // In any case, limit to maxThreshold SSTables.
-
- TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
-
- Iterator<Long> it = allKeys.descendingIterator();
- while(it.hasNext())
- {
- Long key = it.next();
- Set<SSTableReader> bucket = buckets.get(key);
- logger.trace("Key {}, now {}", key, now);
- if (bucket.size() >= minThreshold && key >= now)
- {
- // If we're in the newest bucket, we'll use STCS to
prioritize sstables
- List<Pair<SSTableReader,Long>> pairs =
SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
- List<List<SSTableReader>> stcsBuckets =
SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh,
stcsOptions.bucketLow, stcsOptions.minSSTableSize);
- logger.debug("Using STCS compaction for first window of
bucket: data files {} , options {}", pairs, stcsOptions);
- List<SSTableReader> stcsInterestingBucket =
SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets,
minThreshold, maxThreshold);
-
- // If the tables in the current bucket aren't eligible in
the STCS strategy, we'll skip it and look for other buckets
- if (!stcsInterestingBucket.isEmpty())
- return stcsInterestingBucket;
- }
- else if (bucket.size() >= 2 && key < now)
- {
- logger.debug("bucket size {} >= 2 and not in current
bucket, compacting what's here: {}", bucket.size(), bucket);
- return trimToThreshold(bucket, maxThreshold);
- }
- else
- {
- logger.debug("No compaction necessary for bucket size {} ,
key {}, now {}", bucket.size(), key, now);
- }
- }
- return Collections.<SSTableReader>emptyList();
- }
-
- /**
- * @param bucket set of sstables
- * @param maxThreshold maximum number of sstables in a single
compaction task.
- * @return A bucket trimmed to the maxThreshold newest sstables.
- */
- @VisibleForTesting
- static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket,
int maxThreshold)
- {
- List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
-
- // Trim the largest sstables off the end to meet the maxThreshold
- Collections.sort(ssTableReaders, new
SSTableReader.SizeComparator());
-
- return ImmutableList.copyOf(Iterables.limit(ssTableReaders,
maxThreshold));
- }
-
- @Override
- public synchronized Collection<AbstractCompactionTask>
getMaximalTask(int gcBefore, boolean splitOutput)
- {
- Iterable<SSTableReader> filteredSSTables =
filterSuspectSSTables(sstables);
- if (Iterables.isEmpty(filteredSSTables))
- return null;
- LifecycleTransaction txn =
cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
- if (txn == null)
- return null;
- return Collections.singleton(new CompactionTask(cfs, txn,
gcBefore));
- }
-
- @Override
- public synchronized AbstractCompactionTask
getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
- {
- assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
-
- LifecycleTransaction modifier =
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
- if (modifier == null)
- {
- logger.debug("Unable to mark {} for compaction; probably a
background compaction got to it first. You can disable background
compactions temporarily if this is a problem", sstables);
- return null;
- }
-
- return new CompactionTask(cfs, modifier,
gcBefore).setUserDefined(true);
- }
-
- public int getEstimatedRemainingTasks()
- {
- return this.estimatedRemainingTasks;
- }
-
- public long getMaxSSTableBytes()
- {
- return Long.MAX_VALUE;
- }
-
-
- public static Map<String, String> validateOptions(Map<String, String>
options) throws ConfigurationException
- {
- Map<String, String> uncheckedOptions =
AbstractCompactionStrategy.validateOptions(options);
- uncheckedOptions =
TimeWindowCompactionStrategyOptions.validateOptions(options,
uncheckedOptions);
-
-
uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
-
uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
-
- return uncheckedOptions;
- }
-
- public String toString()
- {
- return String.format("TimeWindowCompactionStrategy[%s/%s]",
- cfs.getMinimumCompactionThreshold(),
- cfs.getMaximumCompactionThreshold());
- }
-}
diff --git a/sstables/time_window_compaction_strategy.hh
b/sstables/time_window_compaction_strategy.hh
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ static constexpr std::chrono::seconds
DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return window_size *
86400s; }
+ static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+ static constexpr std::chrono::seconds
DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return 600s; }
+
+ std::chrono::seconds sstable_window_size =
DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
+ db_clock::duration expired_sstable_check_frequency =
DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
+
+ // FIXME: override default values with the ones in schema.
+};
+
+using timestamp_type = api::timestamp_type;
+
+class time_window_compaction_strategy : public compaction_strategy_impl {
+ time_window_compaction_strategy_options _options;
+ int64_t _estimated_remaining_tasks = 0;
+ db_clock::time_point _last_expired_check;
+ timestamp_type _highest_window_seen;
+public:
+ time_window_compaction_strategy(const std::map<sstring, sstring>&
options)
+ : compaction_strategy_impl(options)
+ {
+ if (!options.count(TOMBSTONE_COMPACTION_INTERVAL_OPTION)
&& !options.count(TOMBSTONE_THRESHOLD_OPTION)) {
+ _disable_tombstone_compaction = true;
+ clogger.debug("Disabling tombstone compactions for TWCS");
+ } else {
+ clogger.debug("Enabling tombstone compactions for TWCS");
+ }
+ _use_clustering_key_filter = true;
+ }
+
+ virtual compaction_descriptor
get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable>
candidates) override {
+ auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
+
+ if (candidates.empty()) {
+ return compaction_descriptor();
+ }
+
+ // Find fully expired SSTables. Those will be included no matter
what.
+ std::vector<shared_sstable> expired;
+
+ if (db_clock::now() - _last_expired_check >
_options.expired_sstable_check_frequency) {
+ clogger.debug("TWCS expired check sufficiently far in the
past, checking for fully expired SSTables");
+ expired = get_fully_expired_sstables(cf, candidates,
gc_before.time_since_epoch().count());
+ _last_expired_check = db_clock::now();
+ } else {
+ clogger.debug("TWCS skipping check for fully expired
SSTables");
+ }
+
+ }
+
+ // if there is no sstable to compact in standard way, try
compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ auto e = boost::range::remove_if(non_expiring_sstables, [this,
&gc_before] (const shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
+ });
+ non_expiring_sstables.erase(e, non_expiring_sstables.end());
+ if (non_expiring_sstables.empty()) {
+ return {};
+ }
+ auto it = boost::min_element(non_expiring_sstables, [] (auto& i,
auto& j) {
+ return i->get_stats_metadata().min_timestamp <
j->get_stats_metadata().min_timestamp;
+ });
+ return { *it };
+ }
+
+ std::vector<shared_sstable> get_compaction_candidates(column_family&
cf, std::vector<shared_sstable> candidate_sstables) {
+ auto p = get_buckets(std::move(candidate_sstables),
_options.sstable_window_size);
+ // Update the highest window seen, if necessary
+ _highest_window_seen = std::max(_highest_window_seen, p.second);
+
+ update_estimated_compaction_by_tasks(p.first,
cf.schema()->min_compaction_threshold());
+
+ return newest_bucket(std::move(p.first),
cf.schema()->min_compaction_threshold(),
cf.schema()->max_compaction_threshold(),
+ _options.sstable_window_size, _highest_window_seen);
+ }
+public:
+ // Find the lowest timestamp for window of given size
+ static timestamp_type
+ get_window_lower_bound(std::chrono::seconds sstable_window_size,
timestamp_type timestamp) {
+ using namespace std::chrono;
+ auto timestamp_in_sec =
duration_cast<seconds>(microseconds(timestamp)).count();
+
+ // mask out window size from timestamp to get lower bound of its
window
+ auto window_lower_bound_in_sec = seconds(timestamp_in_sec -
(timestamp_in_sec % sstable_window_size.count()));
+
+ return
timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
+ }
+
+ // Group files with similar max timestamp into buckets.
+ // @return A pair, where the left element is the bucket representation
(map of timestamp to sstablereader),
+ // and the right is the highest timestamp seen
+ static std::pair<std::map<timestamp_type,
std::vector<shared_sstable>>, timestamp_type>
+ get_buckets(std::vector<shared_sstable> files, std::chrono::seconds
sstable_window_size) {
+ std::map<timestamp_type, std::vector<shared_sstable>> buckets;
+
+ timestamp_type max_timestamp = 0;
+ // Create map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the
nearest window bucket
+ for (auto&& f : files) {
+ timestamp_type ts = f->get_stats_metadata().max_timestamp;
+ timestamp_type lower_bound =
get_window_lower_bound(sstable_window_size, ts);
+ buckets[lower_bound].push_back(std::move(f));
+ max_timestamp = std::max(max_timestamp, lower_bound);
+ }
+
+ return std::make_pair(std::move(buckets), max_timestamp);
+ }
+
+ static std::vector<shared_sstable>
+ newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>>
buckets, int min_threshold, int max_threshold,
+ std::chrono::seconds sstable_window_size, timestamp_type now) {
+ // If the current bucket has at least minThreshold SSTables,
choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
+ auto key = key_bucket.first;
+ auto& bucket = key_bucket.second;
+
+ clogger.trace("Key {}, now {}", key, now);
+
+ if (bucket.size() >= size_t(min_threshold) && key >= now) {
+ // If we're in the newest bucket, we'll use STCS to
prioritize sstables
+ auto stcs_interesting_bucket =
size_tiered_most_interesting_bucket(bucket);
+
+ // If the tables in the current bucket aren't eligible in
the STCS strategy, we'll skip it and look for other buckets
+ if (!stcs_interesting_bucket.empty()) {
+ return stcs_interesting_bucket;
+ }
+ } else if (bucket.size() >= 2 && key < now) {
+ clogger.debug("bucket size {} >= 2 and not in current
bucket, compacting what's here", bucket.size());
+ return trim_to_threshold(std::move(bucket), max_threshold);
+ } else {
+ clogger.debug("No compaction necessary for bucket size
{} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return {};
+ }
+
+ static std::vector<shared_sstable>
+ trim_to_threshold(std::vector<shared_sstable> bucket, int
max_threshold) {
+ auto n = std::min(bucket.size(), size_t(max_threshold));
+ // Trim the largest sstables off the end to meet the maxThreshold
+ boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto&
j) {
+ return i->ondisk_data_size() < j->ondisk_data_size();
+ });
+ bucket.resize(n);
+ return bucket;
+ }
+private:
+ void update_estimated_compaction_by_tasks(std::map<timestamp_type,
std::vector<shared_sstable>>& tasks, int min_threshold) {
+ int64_t n = 0;
+ timestamp_type now = _highest_window_seen;
+
+ for (auto task : tasks) {
+ auto key = task.first;
+
+ // For current window, make sure it's compactable
+ auto count = task.second.size();
+ if (key >= now && count >= size_t(min_threshold)) {
+ n++;
+ } else if (key < now && count >= 2) {
+ n++;
+ }
+ }
+ _estimated_remaining_tasks = n;
+ }
+public:
+ virtual int64_t estimated_pending_compactions(column_family& cf) const
override {
+ return _estimated_remaining_tasks;
+ }
+
+ virtual compaction_strategy_type type() const {
+ throw std::runtime_error("TWCS not wired yet");
+ }
+};
+
+}

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 19, 2017, 3:23:34 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction/twcs: override default values with options in schema

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/sstables/time_window_compaction_strategy.hh
b/sstables/time_window_compaction_strategy.hh
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -44,6 +44,7 @@
#include "compaction_strategy_impl.hh"
#include "compaction.hh"
#include "timestamp.hh"
+#include "exceptions/exceptions.hh"
#include <boost/range/algorithm/partial_sort.hpp>
#include <boost/range/adaptors.hpp>

@@ -53,15 +54,62 @@ extern logging::logger clogger;

using namespace std::chrono_literals;

-struct time_window_compaction_strategy_options {
+class time_window_compaction_strategy_options {
+private:
static constexpr std::chrono::seconds
DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return window_size *
86400s; }
static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
static constexpr std::chrono::seconds
DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return 600s; }

+ static constexpr auto TIMESTAMP_RESOLUTION_KEY
= "timestamp_resolution";
+ static constexpr auto COMPACTION_WINDOW_UNIT_KEY
= "compaction_window_unit";
+ static constexpr auto COMPACTION_WINDOW_SIZE_KEY
= "compaction_window_size";
+ static constexpr auto EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY
= "expired_sstable_check_frequency_seconds";
+
+ const std::unordered_map<sstring, std::chrono::seconds>
valid_window_units = { { "MINUTES", 60s }, { "HOURS", 3600s }, { "DAYS",
86400s } };
+ // TODO: add support to timestamp resolution other than microseconds,
but it's not that important
+ // because new clients only use this one.
+ const std::unordered_set<sstring> valid_timestamp_resolutions =
{ "MICROSECONDS" };
+
std::chrono::seconds sstable_window_size =
DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
db_clock::duration expired_sstable_check_frequency =
DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
+public:
+ time_window_compaction_strategy_options(const std::map<sstring,
sstring>& options) {
+ std::chrono::seconds window_unit;
+
@@ -73,7 +121,7 @@ class time_window_compaction_strategy : public
compaction_strategy_impl {
timestamp_type _highest_window_seen;
public:
time_window_compaction_strategy(const std::map<sstring, sstring>&
options)

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 19, 2017, 3:23:36 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

compaction: wire up time window compaction strategy

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/compaction_strategy.hh b/compaction_strategy.hh
--- a/compaction_strategy.hh
+++ b/compaction_strategy.hh
@@ -33,6 +33,7 @@ enum class compaction_strategy_type {
size_tiered,
leveled,
date_tiered,
+ time_window,
};

class compaction_strategy_impl;
@@ -82,6 +83,8 @@ public:
return "LeveledCompactionStrategy";
case compaction_strategy_type::date_tiered:
return "DateTieredCompactionStrategy";
+ case compaction_strategy_type::time_window:
+ return "TimeWindowCompactionStrategy";
default:
throw std::runtime_error("Invalid Compaction Strategy");
}
@@ -100,6 +103,8 @@ public:
return compaction_strategy_type::leveled;
} else if (short_name == "DateTieredCompactionStrategy") {
return compaction_strategy_type::date_tiered;
+ } else if (short_name == "TimeWindowCompactionStrategy") {
+ return compaction_strategy_type::time_window;
} else {
throw exceptions::configuration_exception(sprint("Unable to
find compaction strategy class '%s'", name));
}
diff --git a/sstables/compaction_strategy.cc
b/sstables/compaction_strategy.cc
--- a/sstables/compaction_strategy.cc
+++ b/sstables/compaction_strategy.cc
@@ -55,6 +55,7 @@
#include "size_tiered_compaction_strategy.hh"
#include "date_tiered_compaction_strategy.hh"
#include "leveled_compaction_strategy.hh"
+#include "time_window_compaction_strategy.hh"

logging::logger date_tiered_manifest::logger =
logging::logger("DateTieredCompactionStrategy");
logging::logger leveled_manifest::logger("LeveledManifest");
@@ -456,6 +457,9 @@ compaction_strategy
make_compaction_strategy(compaction_strategy_type strategy,
case compaction_strategy_type::date_tiered:
impl =
make_shared<date_tiered_compaction_strategy>(date_tiered_compaction_strategy(options));
break;
+ case compaction_strategy_type::time_window:
+ impl =
make_shared<time_window_compaction_strategy>(time_window_compaction_strategy(options));
+ break;
default:
throw std::runtime_error("strategy not supported");
}
diff --git a/sstables/time_window_compaction_strategy.hh
b/sstables/time_window_compaction_strategy.hh
--- a/sstables/time_window_compaction_strategy.hh
+++ b/sstables/time_window_compaction_strategy.hh
@@ -295,7 +295,7 @@ public:
}

virtual compaction_strategy_type type() const {
- throw std::runtime_error("TWCS not wired yet");
+ return compaction_strategy_type::time_window;
}
};

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 19, 2017, 3:23:37 AM7/19/17
to scylladb-dev@googlegroups.com, Raphael S. Carvalho
From: Raphael S. Carvalho <raph...@scylladb.com>
Committer: Raphael S. Carvalho <raph...@scylladb.com>
Branch: master

tests: add tests for time window compaction strategy

Signed-off-by: Raphael S. Carvalho <raph...@scylladb.com>

---
diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc
--- a/tests/sstable_datafile_test.cc
+++ b/tests/sstable_datafile_test.cc
@@ -42,6 +42,7 @@
#include "partition_slice_builder.hh"
#include "sstables/compaction_strategy_impl.hh"
#include "sstables/date_tiered_compaction_strategy.hh"
+#include "sstables/time_window_compaction_strategy.hh"
#include "mutation_assertions.hh"
#include "mutation_reader_assertions.hh"
#include "counters.hh"
@@ -69,6 +70,8 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t
ttl = 0, uint32_t expira
}
}

+static shared_sstable
make_sstable_containing(std::function<shared_sstable()> sst_factory,
std::vector<mutation> muts);
+
SEASTAR_TEST_CASE(datafile_generation_01) {
// Data file with clustering key
//
@@ -3105,6 +3108,124 @@ SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
return make_ready_future<>();
}

+SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
+ using namespace std::chrono;
+
+ using namespace std::chrono;
+
+ return seastar::async([] {
+ auto s = schema_builder("tests", "time_window_strategy")
+ .with_column("id", utf8_type, column_kind::partition_key)
+ .with_column("value", int32_type).build();
+
+ auto tmp = make_lw_shared<tmpdir>();
+ auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] ()
mutable {
+ return make_lw_shared<sstable>(s, tmp->path, (*gen)++, la,
big);
+ };
+
+ auto make_insert = [&] (partition_key key, api::timestamp_type t) {
+ mutation m(key, s);
+ m.set_clustered_cell(clustering_key::make_empty(),
bytes("value"), data_value(int32_t(1)), t);
+ return m;
+ };
+
+ api::timestamp_type tstamp =
api::timestamp_clock::now().time_since_epoch().count();
+ api::timestamp_type tstamp2 = tstamp -
duration_cast<microseconds>(seconds(2L * 3600L)).count();
+
+ std::vector<shared_sstable> sstables;
+
+ // create 5 sstables
+ for (api::timestamp_type t = 0; t < 3; t++) {
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" +
to_sstring(t))});
+ auto mut = make_insert(std::move(key), t);
+ sstables.push_back(make_sstable_containing(sst_gen,
{std::move(mut)}));
+ }
+ // Decrement the timestamp to simulate a timestamp in the past hour
+ for (api::timestamp_type t = 3; t < 5; t++) {
+ // And add progressively more cells into each sstable
+ auto key = partition_key::from_exploded(*s, {to_bytes("key" +
to_sstring(t))});
+ auto mut = make_insert(std::move(key), t);
+ sstables.push_back(make_sstable_containing(sst_gen,
{std::move(mut)}));
+ }
+
+ std::map<api::timestamp_type, std::vector<shared_sstable>> buckets;
+
Reply all
Reply to author
Forward
0 new messages