For more details,
https://issues.apache.org/jira/browse/CASSANDRA-9666
Fixes #1432.
sstables/time_window_compaction_strategy.hh | 258 +++++++++++++++++++
sstables/TimeWindowCompactionStrategy.java | 380 ----------------------------
2 files changed, 258 insertions(+), 380 deletions(-)
create mode 100644 sstables/time_window_compaction_strategy.hh
delete mode 100644 sstables/TimeWindowCompactionStrategy.java
diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh
new file mode 100644
index 0000000..f213c01
--- /dev/null
+++ b/sstables/time_window_compaction_strategy.hh
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Copyright (C) 2017 ScyllaDB
+ *
+ * Modified by ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "compaction_strategy_impl.hh"
+#include "compaction.hh"
+#include "timestamp.hh"
+#include <boost/range/algorithm/partial_sort.hpp>
+
+namespace sstables {
+
+extern logging::logger clogger;
+
+struct time_window_compaction_strategy_options {
+ static constexpr std::chrono::seconds DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return std::chrono::seconds(86400*window_size); }
+ static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+ static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return std::chrono::seconds(60*10); }
+
+ std::chrono::seconds sstable_window_size = DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
+ db_clock::duration expired_sstable_check_frequency = DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
+
+ // FIXME: override default values with the ones in schema.
+};
+
+using timestamp_type = api::timestamp_type;
+
+class time_window_compaction_strategy : public compaction_strategy_impl {
+ time_window_compaction_strategy_options _options;
+ int64_t _estimated_remaining_tasks = 0;
+ db_clock::time_point _last_expired_check;
+ timestamp_type _highest_window_seen;
+public:
+ time_window_compaction_strategy(const std::map<sstring, sstring>& options)
+ : compaction_strategy_impl(options)
+ {
+ if (!options.count(TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.count(TOMBSTONE_THRESHOLD_OPTION)) {
+ _disable_tombstone_compaction = true;
+ clogger.debug("Disabling tombstone compactions for TWCS");
+ } else {
+ clogger.debug("Enabling tombstone compactions for TWCS");
+ }
+ _use_clustering_key_filter = true;
+ }
+
+ virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override {
+ auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
+
+ if (candidates.empty()) {
+ return compaction_descriptor();
+ }
+
+ // Find fully expired SSTables. Those will be included no matter what.
+ std::vector<shared_sstable> expired;
+
+ if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
+ clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+ expired = get_fully_expired_sstables(cf, candidates, gc_before.time_since_epoch().count());
+ _last_expired_check = db_clock::now();
+ } else {
+ clogger.debug("TWCS skipping check for fully expired SSTables");
+ }
+
+ std::vector<shared_sstable> non_expired;
+
+ if (!expired.empty()) {
+ auto cmp = [] (const shared_sstable& x, const shared_sstable& y) {
+ return x->generation() < y->generation();
+ };
+ boost::range::sort(candidates, cmp);
+ boost::range::sort(expired, cmp);
+ boost::set_difference(candidates, expired, std::back_inserter(non_expired), cmp);
+ } else {
+ non_expired = std::move(candidates);
+ }
+
+ auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(non_expired), gc_before);
+ if (!expired.empty()) {
+ compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
+ }
+ return compaction_candidates;
+ }
+private:
+ std::vector<shared_sstable>
+ get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
+ auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
+
+ if (!most_interesting.empty()) {
+ return most_interesting;
+ }
+
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
+ return !worth_dropping_tombstones(sst, gc_before);
+ });
+ non_expiring_sstables.erase(e, non_expiring_sstables.end());
+ if (non_expiring_sstables.empty()) {
+ return {};
+ }
+ auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
+ return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
+ });
+ return { *it };
+ }
+
+ std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
+ auto p = get_buckets(std::move(candidate_sstables), _options.sstable_window_size);
+ // Update the highest window seen, if necessary
+ _highest_window_seen = std::max(_highest_window_seen, p.second);
+
+ update_estimated_compaction_by_tasks(p.first, cf.schema()->min_compaction_threshold());
+
+ return newest_bucket(std::move(p.first), cf.schema()->min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
+ _options.sstable_window_size, _highest_window_seen);
+ }
+public:
+ // Find the lowest timestamp for window of given size
+ static timestamp_type
+ get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
+ using namespace std::chrono;
+ auto timestamp_in_sec = duration_cast<seconds>(microseconds(timestamp)).count();
+
+ // mask out window size from timestamp to get lower bound of its window
+ auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count()));
+
+ return timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
+ }
+
+ // Group files with similar max timestamp into buckets.
+ // @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
+ // and the right is the highest timestamp seen
+ static std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
+ get_buckets(std::vector<shared_sstable> files, std::chrono::seconds sstable_window_size) {
+ std::map<timestamp_type, std::vector<shared_sstable>> buckets;
+
+ timestamp_type max_timestamp = 0;
+ // Create map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+ for (auto&& f : files) {
+ timestamp_type ts = f->get_stats_metadata().max_timestamp;
+ timestamp_type lower_bound = get_window_lower_bound(sstable_window_size, ts);
+ buckets[lower_bound].push_back(std::move(f));
+ max_timestamp = std::max(max_timestamp, lower_bound);
+ }
+
+ return std::make_pair(std::move(buckets), max_timestamp);
+ }
+
+ static std::vector<shared_sstable>
+ newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets, int min_threshold, int max_threshold,
+ std::chrono::seconds sstable_window_size, timestamp_type now) {
+ // If the current bucket has at least minThreshold SSTables, choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ for (auto it = buckets.rbegin(); it != buckets.rend(); it++) {
+ auto key = it->first;
+ auto& bucket = it->second;
+
+ clogger.trace("Key {}, now {}", key, now);
+
+ if (bucket.size() >= size_t(min_threshold) && key >= now) {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ auto stcs_interesting_bucket = size_tiered_most_interesting_bucket(bucket);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcs_interesting_bucket.empty()) {
+ return stcs_interesting_bucket;
+ }
+ } else if (bucket.size() >= 2 && key < now) {
+ clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
+ return trim_to_threshold(std::move(bucket), max_threshold);
+ } else {
+ clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return {};
+ }
+
+ static std::vector<shared_sstable>
+ trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
+ auto n = std::min(bucket.size(), size_t(max_threshold));
+ // Trim the largest sstables off the end to meet the maxThreshold
+ boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
+ return i->ondisk_data_size() < j->ondisk_data_size();
+ });
+ bucket.resize(n);
+ return bucket;
+ }
+private:
+ void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold) {
+ int64_t n = 0;
+ timestamp_type now = _highest_window_seen;
+
+ for (auto task : tasks) {
+ auto key = task.first;
+
+ // For current window, make sure it's compactable
+ auto count = task.second.size();
+ if (key >= now && count >= size_t(min_threshold)) {
+ n++;
+ } else if (key < now && count >= 2) {
+ n++;
+ }
+ }
+ _estimated_remaining_tasks = n;
+ }
+public:
+ virtual int64_t estimated_pending_compactions(column_family& cf) const override {
+ return _estimated_remaining_tasks;
+ }
+
+ virtual compaction_strategy_type type() const {
+ throw std::runtime_error("TWCS not wired yet");
+ }
+};
+
+}
diff --git a/sstables/TimeWindowCompactionStrategy.java b/sstables/TimeWindowCompactionStrategy.java
deleted file mode 100644
index e2ab7dc..0000000
--- a/sstables/TimeWindowCompactionStrategy.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.compaction;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.schema.CompactionParams;
-import org.apache.cassandra.utils.Pair;
-
-import static com.google.common.collect.Iterables.filter;
-
-public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
-{
- private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
-
- private final TimeWindowCompactionStrategyOptions options;
- protected volatile int estimatedRemainingTasks;
- private final Set<SSTableReader> sstables = new HashSet<>();
- private long lastExpiredCheck;
- private long highestWindowSeen;
-
- public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
- {
- super(cfs, options);
- this.estimatedRemainingTasks = 0;
- this.options = new TimeWindowCompactionStrategyOptions(options);
- if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
- {
- disableTombstoneCompactions = true;
- logger.debug("Disabling tombstone compactions for TWCS");
- }
- else
- logger.debug("Enabling tombstone compactions for TWCS");
-
- }
-
- @Override
- public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
- {
- while (true)
- {
- List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
-
- if (latestBucket.isEmpty())
- return null;
-
- LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
- if (modifier != null)
- return new CompactionTask(cfs, modifier, gcBefore);
- }
- }
-
- /**
- *
- * @param gcBefore
- * @return
- */
- private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
- {
- if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
- return Collections.emptyList();
-
- Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
-
- // Find fully expired SSTables. Those will be included no matter what.
- Set<SSTableReader> expired = Collections.emptySet();
-
- if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
- {
- logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
- expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
- lastExpiredCheck = System.currentTimeMillis();
- }
- else
- {
- logger.debug("TWCS skipping check for fully expired SSTables");
- }
-
- Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
-
- List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
- if (!expired.isEmpty())
- {
- logger.debug("Including expired sstables: {}", expired);
- compactionCandidates.addAll(expired);
- }
-
- return compactionCandidates;
- }
-
- private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
- {
- List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
-
- if (mostInteresting != null)
- {
- return mostInteresting;
- }
-
- // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
- // ratio is greater than threshold.
- List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
- for (SSTableReader sstable : nonExpiringSSTables)
- {
- if (worthDroppingTombstones(sstable, gcBefore))
- sstablesWithTombstones.add(sstable);
- }
- if (sstablesWithTombstones.isEmpty())
- return Collections.emptyList();
-
- return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
- }
-
- private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
- {
- Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
- // Update the highest window seen, if necessary
- if(buckets.right > this.highestWindowSeen)
- this.highestWindowSeen = buckets.right;
-
- updateEstimatedCompactionsByTasks(buckets.left);
- List<SSTableReader> mostInteresting = newestBucket(buckets.left,
- cfs.getMinimumCompactionThreshold(),
- cfs.getMaximumCompactionThreshold(),
- options.sstableWindowUnit,
- options.sstableWindowSize,
- options.stcsOptions,
- this.highestWindowSeen);
- if (!mostInteresting.isEmpty())
- return mostInteresting;
- return null;
- }
-
- @Override
- public void addSSTable(SSTableReader sstable)
- {
- sstables.add(sstable);
- }
-
- @Override
- public void removeSSTable(SSTableReader sstable)
- {
- sstables.remove(sstable);
- }
-
- /**
- * Find the lowest and highest timestamps in a given timestamp/unit pair
- * Returns milliseconds, caller should adjust accordingly
- */
- public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
- {
- long lowerTimestamp;
- long upperTimestamp;
- long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
-
- switch(windowTimeUnit)
- {
- case MINUTES:
- lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
- break;
- case HOURS:
- lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
- break;
- case DAYS:
- default:
- lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
- upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
- break;
- }
-
- return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
- TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
-
- }
-
- /**
- * Group files with similar max timestamp into buckets.
- *
- * @param files pairs consisting of a file and its min timestamp
- * @param sstableWindowUnit
- * @param sstableWindowSize
- * @param timestampResolution
- * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
- */
- @VisibleForTesting
- static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
- {
- HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
-
- long maxTimestamp = 0;
- // Create hash map to represent buckets
- // For each sstable, add sstable to the time bucket
- // Where the bucket is the file's max timestamp rounded to the nearest window bucket
- for (SSTableReader f : files)
- {
- assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
- long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
- Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
- buckets.put(bounds.left, f);
- if (bounds.left > maxTimestamp)
- maxTimestamp = bounds.left;
- }
-
- logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
- return Pair.create(buckets, maxTimestamp);
- }
-
- private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
- {
- int n = 0;
- long now = this.highestWindowSeen;
-
- for(Long key : tasks.keySet())
- {
- // For current window, make sure it's compactable
- if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
- n++;
- else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
- n++;
- }
- this.estimatedRemainingTasks = n;
- }
-
-
- /**
- * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
- * @param minThreshold minimum number of sstables in a bucket to qualify.
- * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
- * @return a bucket (list) of sstables to compact.
- */
- @VisibleForTesting
- static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
- {
- // If the current bucket has at least minThreshold SSTables, choose that one.
- // For any other bucket, at least 2 SSTables is enough.
- // In any case, limit to maxThreshold SSTables.
-
- TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
-
- Iterator<Long> it = allKeys.descendingIterator();
- while(it.hasNext())
- {
- Long key = it.next();
- Set<SSTableReader> bucket = buckets.get(key);
- logger.trace("Key {}, now {}", key, now);
- if (bucket.size() >= minThreshold && key >= now)
- {
- // If we're in the newest bucket, we'll use STCS to prioritize sstables
- List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
- List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
- logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
- List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
-
- // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
- if (!stcsInterestingBucket.isEmpty())
- return stcsInterestingBucket;
- }
- else if (bucket.size() >= 2 && key < now)
- {
- logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
- return trimToThreshold(bucket, maxThreshold);
- }
- else
- {
- logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
- }
- }
- return Collections.<SSTableReader>emptyList();
- }
-
- /**
- * @param bucket set of sstables
- * @param maxThreshold maximum number of sstables in a single compaction task.
- * @return A bucket trimmed to the maxThreshold newest sstables.
- */
- @VisibleForTesting
- static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
- {
- List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
-
- // Trim the largest sstables off the end to meet the maxThreshold
- Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
-
- return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
- }
-
- @Override
- public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
- {
- Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
- if (Iterables.isEmpty(filteredSSTables))
- return null;
- LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
- if (txn == null)
- return null;
- return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
- }
-
- @Override
- public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
- {
- assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
-
- LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
- if (modifier == null)
- {
- logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
- return null;
- }
-
- return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
- }
-
- public int getEstimatedRemainingTasks()
- {
- return this.estimatedRemainingTasks;
- }
-
- public long getMaxSSTableBytes()
- {
- return Long.MAX_VALUE;
- }
-
-
- public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
- {
- Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
- uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
-
- uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
- uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
-
- return uncheckedOptions;
- }
-
- public String toString()
- {
- return String.format("TimeWindowCompactionStrategy[%s/%s]",
- cfs.getMinimumCompactionThreshold(),
- cfs.getMaximumCompactionThreshold());
- }
-}
--
2.9.4