On 4/20/20 3:01 PM, Krzysztof Małysa wrote:
> SeastarFS is a log-structured filesystem. Every shard will have 3
> private logs:
> - metadata log
> - medium data log
> - big data log (this is not actually a log, but in the big picture it
> looks like it was)
>
> Disk space is divided into clusters (typically around several MiB) that
> have all equal size that is multiple of alignment (typically 4096
> bytes). Each shard has its private pool of clusters (assignment is
> stored in bootstrap record). Each log consumes clusters one by one -- it
> writes the current one and if cluster becomes full, then log switches to
> a new one that is obtained from a pool of free clusters managed by
> cluster_allocator. Metadata log and medium data log write data in the
> same manner: they fill up the cluster gradually from left to right. Big
> data log takes a cluster and completely fills it with data at once -- it
> is only used during big writes.
>
> This commit adds the skeleton of the metadata log:
> - data structures for holding metadata in memory with all operations on
> this data structure i.e. manipulating files and their contents
> - locking logic (detailed description can be found in metadata_log.hh)
> - buffers for writting logs to disk (one for metadata and one for medium
> data)
> - basic higher level interface e.g. path lookup, iterating over
> directory
> - boostraping metadata log == reading metadata log from disk and
> reconstructing shard's filesystem structure from just before shutdown
>
> File content is stored as a set of data vectors that may have one of
> three kinds: in memory data, on disk data, hole. Small writes are
> writted directly to the metadata log and because all metadata is stored
> in the memory these writes are also in memory, therefore in-memory kind.
> Medium and large data are not stored in memory, so they are represented
> using on-disk kind. Enlarging file via truncate may produce holes, hence
> hole kind.
>
> Directory entries are stored as metadata log entries -- directory inodes
> have no content.
>
> To disk buffers buffer data that will be written to disk. There are two
> kinds: (normal) to disk buffer and metadata to disk buffer. The latter
> is implemented using the former, but provides higher level interface for
> appending metadata log entries rather than raw bytes.
>
> Normal to disk buffer appends data sequentially, but if a flush occurs
> the offset where next data will be appended is aligned up to alignment
> to ensure that writes to the same cluster are non-overlaping.
>
> Metadata to disk buffer appends data using normal to disk buffer but
> does some formatting along the way. The structure of the metadata log on
> disk is as follows:
> | checkpoint_1 | entry_1, entry_2, ..., entry_n | checkpoint_2 | ... |
> | <---- checkpointed data -----> |
> etc. Every batch of metadata_log entries is preceded by a checkpoint
> entry. Appending metadata log appends the current batch of entries.
> Flushing or lack of space ends current batch of entries and then
> checkpoint entry is updated (because it holds CRC code of all
> checkpointed data) and then write of the whole batch is requested and a
> new checkpoint (if there is space for that) is started. Last checkpoint
> in a cluster contains a special entry pointing to the next cluster that
> is utilized by the metadata log.
>
> Bootstraping is, in fact, just replying of all actions from metadata log
> that were saved on disk. It works as follows:
> - reads metadata log clusters one by one
> - for each cluster, until the last checkpoint contains pointer to the
> next cluster, processes the checkpoint and entries it checkpoints
> - processing works as follows:
> - checkpoint entry is read and if it is invalid it means that the
> metadata log ends here (last checkpoint was partially written or the
> metadata log really ended here or there was some data corruption...)
> and we stop
> - if it is correct, it contains the length of the checkpointed data
> (metadata log entries), so then we process all of them (error there
> indicates that there was data corruption but CRC is still somehow
> correct, so we abort all bootstraping with an error)
>
> Locking is to ensure that concurrent modifications of the metadata do
> not corrupt it. E.g. Creating a file is a complex operation: you have
> to create inode and add a directory entry that will represent this inode
> with a path and write corresponding metadata log entries to the disk.
> Simultaneous attempts of creating the same file could corrupt the file
> system. Not to mention concurrent create and unlink on the same path...
> Thus careful and robust locking mechanism is used. For details see
> metadata_log.hh.
>
> diff --git a/src/fs/inode_info.hh b/src/fs/inode_info.hh
> new file mode 100644
> index 00000000..89bc71d8
> --- /dev/null
> +++ b/src/fs/inode_info.hh
> +
> +namespace seastar::fs {
> +
> +struct inode_data_vec {
> + file_range data_range; // data spans [beg, end) range of the file
> +
> + struct in_mem_data {
> + temporary_buffer<uint8_t> data;
> + };
> +
> + struct on_disk_data {
> + file_offset_t device_offset;
> + };
No separate representation for medium data? The offsets can change while
rewriting the log.
> +
> + struct hole_data { };
> +
> + std::variant<in_mem_data, on_disk_data, hole_data> data_location;
> +
> + // TODO: rename that function to something more suitable
> + inode_data_vec share_copy() {
Maybe "shallow_copy"?
> + inode_data_vec shared;
> + shared.data_range = data_range;
> + std::visit(overloaded {
> + [&](inode_data_vec::in_mem_data& mem) {
> + shared.data_location = inode_data_vec::in_mem_data {mem.data.share()};
Please don't add these spaces in the middle of constructors, it's
distracting.
> + },
> + [&](inode_data_vec::on_disk_data& disk_data) {
> + shared.data_location = disk_data;
> + },
> + [&](inode_data_vec::hole_data&) {
> + shared.data_location = inode_data_vec::hole_data {};
> + },
> + }, data_location);
> + return shared;
> + }
> +};
> +
> +struct inode_info {
> + uint32_t opened_files_count = 0; // Number of open files referencing inode
> + uint32_t directories_containing_file = 0;
Suppose one directory has two names for a the inode. Does it count as 1
or 2 here?
> + unix_metadata metadata;
> +
> + struct directory {
> + // TODO: directory entry cannot contain '/' character --> add checks for that
> + std::map<std::string, inode_t, std::less<>> entries; // entry name => inode
Why std::map and not unordered_map? Resumable iteration? Please add
comments.
> + };
> +
> + struct file {
> + std::map<file_offset_t, inode_data_vec> data; // file offset => data vector that begins there (data vectors
> + // do not overlap)
> +
This can be optimized, but no need to touch it now. Probably some kind
of radix tree.
> + file_offset_t size() const noexcept {
> + return (data.empty() ? 0 : data.rbegin()->second.data_range.end);
> + }
> +
> + // Deletes data vectors that are subset of @p data_range and cuts overlapping data vectors to make them
> + // not overlap. @p cut_data_vec_processor is called on each inode_data_vec (including parts of overlapped
> + // data vectors) that will be deleted
> + template<class Func>
Please add a constraint for the signature of Func, both to document it
and to catch errors earlier. I see you have a static assert, but
constraints are better since they apply to the declaration, not the body.
Consider also changing it from a template parameter to a noncopyable
function. These are all heavyweight operations so saving every cycle
isn't necessary.
> + void cut_out_data_range(file_range range, Func&& cut_data_vec_processor) {
> + static_assert(std::is_invocable_v<Func, inode_data_vec>);
> + // Cut all vectors intersecting with range
> + auto it = data.lower_bound(range.beg);
> + if (it != data.begin() and are_intersecting(range, prev(it)->second.data_range)) {
> + --it;
> + }
> +
> + while (it != data.end() and are_intersecting(range, it->second.data_range)) {
> + auto data_vec = std::move(data.extract(it++).mapped());
I presume this removes *it from the map?
> + const auto cap = intersection(range, data_vec.data_range);
> + if (cap == data_vec.data_range) {
> + // Fully intersects => remove it
> + cut_data_vec_processor(std::move(data_vec));
If cut_data_vec_processor fails, we have to reinstate *it.
> + continue;
> + }
> +
> + // Overlaps => cut it, possibly into two parts:
> + // | data_vec |
> + // | cap |
> + // | left | mid | right |
> + // left and right remain, but mid is deleted
> + inode_data_vec left, mid, right;
> + left.data_range = {data_vec.data_range.beg, cap.beg};
> + mid.data_range = cap;
> + right.data_range = {cap.end, data_vec.data_range.end};
> + auto right_beg_shift = right.data_range.beg - data_vec.data_range.beg;
> + auto mid_beg_shift = mid.data_range.beg - data_vec.data_range.beg;
> + std::visit(overloaded {
> + [&](inode_data_vec::in_mem_data& mem) {
> + left.data_location = inode_data_vec::in_mem_data {mem.data.share(0, left.data_range.size())};
> + mid.data_location = inode_data_vec::in_mem_data {
> + mem.data.share(mid_beg_shift, mid.data_range.size())
> + };
> + right.data_location = inode_data_vec::in_mem_data {
> + mem.data.share(right_beg_shift, right.data_range.size())
> + };
> + },
> + [&](inode_data_vec::on_disk_data& disk_data) {
> + left.data_location = disk_data;
> + mid.data_location = inode_data_vec::on_disk_data {disk_data.device_offset + mid_beg_shift};
> + right.data_location = inode_data_vec::on_disk_data {disk_data.device_offset + right_beg_shift};
> + },
> + [&](inode_data_vec::hole_data&) {
> + left.data_location = inode_data_vec::hole_data {};
> + mid.data_location = inode_data_vec::hole_data {};
> + right.data_location = inode_data_vec::hole_data {};
> + },
> + }, data_vec.data_location);
> +
> + // Save new data vectors
> + if (not left.data_range.is_empty()) {
> + data.emplace(left.data_range.beg, std::move(left));
> + }
> + if (not right.data_range.is_empty()) {
> + data.emplace(right.data_range.beg, std::move(right));
> + }
We need to undo this is cut_data_vec_processor fails. It also means that
cut_data_vec_processor has to commit to strong exception guarantees - if
it throws, it must undo everything it has done for this element.
> +
> + // Process deleted vector
> + cut_data_vec_processor(std::move(mid));
> + }
> + }
Very nice.
> +
> + // Executes @p execute_on_data_ranges_processor on each data vector that is a subset of @p data_range.
> + // Data vectors on the edges are appropriately trimmed before passed to the function.
> + template<class Func>
Again, please add a contraint, and consider using a noncopyable_function
instead.
> + void execute_on_data_range(file_range range, Func&& execute_on_data_range_processor) {
> + static_assert(std::is_invocable_v<Func, inode_data_vec>);
> + auto it = data.lower_bound(range.beg);
> + if (it != data.begin() and are_intersecting(range, prev(it)->second.data_range)) {
> + --it;
> + }
> +
> + while (it != data.end() and are_intersecting(range, it->second.data_range)) {
> + auto& data_vec = (it++)->second;
> + const auto cap = intersection(range, data_vec.data_range);
> + if (cap == data_vec.data_range) {
> + // Fully intersects => execute
> + execute_on_data_range_processor(data_vec.share_copy());
> + continue;
> + }
> +
> + inode_data_vec mid;
> + mid.data_range = std::move(cap);
> + auto mid_beg_shift = mid.data_range.beg - data_vec.data_range.beg;
> + std::visit(overloaded {
> + [&](inode_data_vec::in_mem_data& mem) {
> + mid.data_location = inode_data_vec::in_mem_data {
> + mem.data.share(mid_beg_shift, mid.data_range.size())
> + };
> + },
> + [&](inode_data_vec::on_disk_data& disk_data) {
> + mid.data_location = inode_data_vec::on_disk_data {disk_data.device_offset + mid_beg_shift};
> + },
> + [&](inode_data_vec::hole_data&) {
> + mid.data_location = inode_data_vec::hole_data {};
> + },
> + }, data_vec.data_location);
> +
> + // Execute on middle range
> + execute_on_data_range_processor(std::move(mid));
> + }
> + }
> + };
> +
> + std::variant<directory, file> contents;
> +
Please move to top-of-class and make private.
> + bool is_linked() const noexcept {
> + return directories_containing_file != 0;
> + }
> +
> + bool is_open() const noexcept {
> + return opened_files_count != 0;
> + }
> +
> + constexpr bool is_directory() const noexcept { return std::holds_alternative<directory>(contents); }
> +
> + // These are noexcept because invalid access is a bug not an error
> + constexpr directory& get_directory() & noexcept { return std::get<directory>(contents); }
> + constexpr const directory& get_directory() const & noexcept { return std::get<directory>(contents); }
> + constexpr directory&& get_directory() && noexcept { return std::move(std::get<directory>(contents)); }
> +
> + constexpr bool is_file() const noexcept { return std::holds_alternative<file>(contents); }
> +
> + // These are noexcept because invalid access is a bug not an error
> + constexpr file& get_file() & noexcept { return std::get<file>(contents); }
> + constexpr const file& get_file() const & noexcept { return std::get<file>(contents); }
> + constexpr file&& get_file() && noexcept { return std::move(std::get<file>(contents)); }
> +};
> +
> +} // namespace seastar::fs
> diff --git a/src/fs/metadata_disk_entries.hh b/src/fs/metadata_disk_entries.hh
> new file mode 100644
> index 00000000..44c2a1c7
> --- /dev/null
> +++ b/src/fs/metadata_disk_entries.hh
> @@ -0,0 +1,63 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2019 ScyllaDB
> + */
> +
> +#pragma once
> +
> +#include "fs/cluster.hh"
> +#include "fs/inode.hh"
> +#include "fs/unix_metadata.hh"
> +
> +namespace seastar::fs {
> +
> +enum ondisk_type : uint8_t {
> + INVALID = 0,
> + CHECKPOINT,
> + NEXT_METADATA_CLUSTER,
> +};
> +
> +struct ondisk_checkpoint {
> + // The disk format is as follows:
> + // | ondisk_checkpoint | .............................. |
> + // | data |
> + // |<-- checkpointed_data_length -->|
> + // ^
> + // ______________________________________________/
> + // /
> + // there ends checkpointed data and (next checkpoint begins or metadata in the current cluster end)
> + //
> + // CRC is calculated from byte sequence | data | checkpointed_data_length |
> + // E.g. if the data consist of bytes
> "abcd" and checkpointed_data_length of bytes "xyz" then the byte sequence
> + // would be "abcdxyz"
> + uint32_t crc32_code;
> + unit_size_t checkpointed_data_length;
> +} __attribute__((packed));
You can use the nicer [[gnu::packed]].
Let's also add a uuid unique to the filesystem instance, so we don't
pick up a valid log entry after we reformat a filesystem (if we do,
we'll reject it because the uuids don't match).
> +
> +struct ondisk_next_metadata_cluster {
> + cluster_id_t cluster_id; // metadata log continues there
> +} __attribute__((packed));
> +
> +template<typename T>
> +constexpr size_t ondisk_entry_size(const T& entry) noexcept {
> + static_assert(std::is_same_v<T, ondisk_next_metadata_cluster>, "ondisk entry size not defined for given type");
> + return sizeof(ondisk_type) + sizeof(entry);
> +}
> +
> +} // namespace seastar::fs
> diff --git a/src/fs/metadata_log.hh b/src/fs/metadata_log.hh
> new file mode 100644
> index 00000000..c10852a3
> --- /dev/null
> +++ b/src/fs/metadata_log.hh
> @@ -0,0 +1,295 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2020 ScyllaDB
> + */
> +
> +#pragma once
> +
> +#include "fs/cluster.hh"
> +#include "fs/cluster_allocator.hh"
> +#include "fs/inode.hh"
> +#include "fs/inode_info.hh"
> +#include "fs/metadata_disk_entries.hh"
> +#include "fs/metadata_to_disk_buffer.hh"
> +#include "fs/units.hh"
> +#include "fs/unix_metadata.hh"
> +#include "fs/value_shared_lock.hh"
> +#include "seastar/core/file-types.hh"
> +#include "seastar/core/future-util.hh"
> +#include "seastar/core/future.hh"
> +#include "seastar/core/shared_future.hh"
> +#include "seastar/core/shared_ptr.hh"
> +#include "seastar/core/temporary_buffer.hh"
> +#include "seastar/fs/exceptions.hh"
> +
> +#include <chrono>
> +#include <cstddef>
> +#include <exception>
> +#include <type_traits>
> +#include <utility>
> +#include <variant>
> +
> +namespace seastar::fs {
> +
> +class metadata_log {
> + block_device _device;
> + const unit_size_t _cluster_size;
> + const unit_size_t _alignment;
> +
> + // Takes care of writing current cluster of serialized metadata log entries to device
> + shared_ptr<metadata_to_disk_buffer> _curr_cluster_buff;
> + shared_future<> _background_futures = now();
> +
> + // In memory metadata
> + cluster_allocator _cluster_allocator;
> + std::map<inode_t, inode_info> _inodes;
> +
unordered_map?
> inode_t _root_dir;
> + shard_inode_allocator _inode_allocator;
> +
> + // Locks are used to ensure metadata consistency while allowing concurrent usage.
> + //
> + // Whenever one wants to create or delete inode or directory entry, one has to acquire appropriate unique lock for
> + // the inode / dir entry that will appear / disappear and only after locking that operation should take place.
> + // Shared locks should be used only to ensure that an inode / dir entry won't disappear / appear, while some action
> + // is performed. Therefore, unique locks ensure that resource is not used by anyone else.
> + //
> + // IMPORTANT: if an operation needs to acquire more than one lock, it has to be done with *one* call to
> + // locks::with_locks() because it is ensured there that a deadlock-free locking order is used (for details see
> + // that function).
> + //
> + // Examples:
> + // - To create file we have to take shared lock (SL) on the directory to which we add a dir entry and
> + // unique lock (UL) on the added entry in this directory. SL is taken because the directory should not disappear.
> + // UL is taken, because we do not want the entry to appear while we are creating it.
> + // - To read or write to a file, a SL is acquired on its inode and then the operation is performed.
> + class locks {
> + value_shared_lock<inode_t> _inode_locks;
> + value_shared_lock<std::pair<inode_t, std::string>> _dir_entry_locks;
> +
> + public:
> + struct shared {
> + inode_t inode;
> + std::optional<std::string> dir_entry;
> + };
> +
> + template<class T>
> + static constexpr bool is_shared = std::is_same_v<std::remove_cv_t<std::remove_reference_t<T>>, shared>;
> +
> + struct unique {
> + inode_t inode;
> + std::optional<std::string> dir_entry;
> + };
> +
> + template<class T>
> + static constexpr bool is_unique = std::is_same_v<std::remove_cv_t<std::remove_reference_t<T>>, unique>;
> +
> + template<class Kind, class Func>
> + auto with_lock(Kind kind, Func&& func) {
> + static_assert(is_shared<Kind> or is_unique<Kind>);
> + if constexpr (is_shared<Kind>) {
> + if (kind.dir_entry.has_value()) {
> + return _dir_entry_locks.with_shared_on({kind.inode, std::move(*kind.dir_entry)},
> + std::forward<Func>(func));
> + } else {
> + return _inode_locks.with_shared_on(kind.inode, std::forward<Func>(func));
> + }
> + } else {
> + if (kind.dir_entry.has_value()) {
> + return _dir_entry_locks.with_lock_on({kind.inode, std::move(*kind.dir_entry)},
> + std::forward<Func>(func));
> + } else {
> + return _inode_locks.with_lock_on(kind.inode, std::forward<Func>(func));
> + }
> + }
> + }
> +
> + private:
> + template<class Kind1, class Kind2, class Func>
> + auto with_locks_in_order(Kind1 kind1, Kind2 kind2, Func func) {
> + // Func is not an universal reference because we will have to store it
> + return with_lock(std::move(kind1), [this, kind2 = std::move(kind2), func = std::move(func)] () mutable {
> + return with_lock(std::move(kind2), std::move(func));
> + });
> + };
> +
> + public:
> +
> + template<class Kind1, class Kind2, class Func>
> + auto with_locks(Kind1 kind1, Kind2 kind2, Func&& func) {
> + static_assert(is_shared<Kind1> or is_unique<Kind1>);
> + static_assert(is_shared<Kind2> or is_unique<Kind2>);
> +
> + // Locking order is as follows: kind with lower tuple (inode, dir_entry) goes first.
> + // This order is linear and we always lock in one direction, so the graph of locking relations (A -> B iff
> + // lock on A is acquired and lock on B is acquired / being acquired) makes a DAG. Thus, deadlock is
> + // impossible, as it would require a cycle to appear.
> + std::pair<inode_t, std::optional<std::string>&> k1 {kind1.inode, kind1.dir_entry};
> + std::pair<inode_t, std::optional<std::string>&> k2 {kind2.inode, kind2.dir_entry};
> + if (k1 < k2) {
> + return with_locks_in_order(std::move(kind1), std::move(kind2), std::forward<Func>(func));
> + } else {
> + return with_locks_in_order(std::move(kind2), std::move(kind1), std::forward<Func>(func));
> + }
You can std::swap(kind1, kind2) instead of duplicating the call. This
will reduce the compiler's motivation to inline two calls to func,
bloating the code.
> + }
> + } _locks;
Very nice, this locking system.
> +
> + // TODO: for compaction: keep some set(?) of inode_data_vec, so that we can keep track of clusters that have lowest
> + // utilization (up-to-date data)
> + // TODO: for compaction: keep estimated metadata log size (that would take when written to disk) and
> + // the real size of metadata log taken on disk to allow for detecting when compaction
> +
> + friend class metadata_log_bootstrap;
> +
> +public:
> + metadata_log(block_device device, unit_size_t cluster_size, unit_size_t alignment,
> + shared_ptr<metadata_to_disk_buffer> cluster_buff);
> +
> + metadata_log(block_device device, unit_size_t cluster_size, unit_size_t alignment);
> +
> + metadata_log(const metadata_log&) = delete;
> + metadata_log& operator=(const metadata_log&) = delete;
> + metadata_log(metadata_log&&) = default;
> +
> + future<> bootstrap(inode_t root_dir, cluster_id_t first_metadata_cluster_id, cluster_range available_clusters,
> + fs_shard_id_t fs_shards_pool_size, fs_shard_id_t fs_shard_id);
> +
> + future<> shutdown();
> +
> +private:
> + bool inode_exists(inode_t inode) const noexcept {
> + return _inodes.count(inode) != 0;
> + }
> +
> + template<class Func>
> + void schedule_background_task(Func&& task) {
> + _background_futures = when_all_succeed(_background_futures.get_future(), std::forward<Func>(task));
> + }
Please add a TODO to limit the amount of background work. This code
allows it to grow without bounds, so if we're not able to retire this
work quickly enough we can go OOM.
> +
> + void schedule_flush_of_curr_cluster();
> +
> + enum class flush_result {
> + DONE,
> + NO_SPACE
> + };
> +
> + [[nodiscard]] flush_result schedule_flush_of_curr_cluster_and_change_it_to_new_one();
> +
> + future<> flush_curr_cluster();
> +
> + enum class append_result {
> + APPENDED,
> + TOO_BIG,
> + NO_SPACE
> + };
> +
> + template<class... Args>
> + [[nodiscard]] append_result append_ondisk_entry(Args&&... args) {
> + using AR = append_result;
> + // TODO: maybe check for errors on _background_futures to expose previous errors?
> + switch (_curr_cluster_buff->append(args...)) {
> + case metadata_to_disk_buffer::APPENDED:
> + return AR::APPENDED;
> + case metadata_to_disk_buffer::TOO_BIG:
> + break;
> + }
> +
> + switch (schedule_flush_of_curr_cluster_and_change_it_to_new_one()) {
> + case flush_result::NO_SPACE:
> + return AR::NO_SPACE;
> + case flush_result::DONE:
> + break;
> + }
> +
> + switch (_curr_cluster_buff->append(args...)) {
> + case metadata_to_disk_buffer::APPENDED:
> + return AR::APPENDED;
> + case metadata_to_disk_buffer::TOO_BIG:
> + return AR::TOO_BIG;
> + }
> +
> + __builtin_unreachable();
> + }
> +
> + enum class path_lookup_error {
> + NOT_ABSOLUTE, // a path is not absolute
> + NO_ENTRY, // no such file or directory
> + NOT_DIR, // a component used as a directory in path is not, in fact, a directory
> + };
> +
> + std::variant<inode_t, path_lookup_error> do_path_lookup(const std::string& path) const noexcept;
> +
> + // It is safe for @p path to be a temporary (there is no need to worry about its lifetime)
> + future<inode_t> path_lookup(const std::string& path) const;
> +
> +public:
> + template<class Func>
As usual, constraint, and consider noncopyable_function.
I see you support two signatures, so you may have to split into two
functions.
> + future<> iterate_directory(const std::string& dir_path, Func func) {
> + static_assert(std::is_invocable_r_v<future<>, Func, const std::string&> or
> + std::is_invocable_r_v<future<stop_iteration>, Func, const std::string&>);
> + auto convert_func = [&]() -> decltype(auto) {
> + if constexpr (std::is_invocable_r_v<future<stop_iteration>, Func, const std::string&>) {
> + return std::move(func);
> + } else {
> + return [func = std::move(func)]() -> future<stop_iteration> {
> + return func().then([] {
where's the string parameter?
> + return stop_iteration::no;
> + });
> + };
> + }
> + };
> + return path_lookup(dir_path).then([this, func = convert_func()](inode_t dir_inode) {
> + return do_with(std::move(func), std::string {}, [this, dir_inode](auto& func, auto& prev_entry) {
> + auto it = _inodes.find(dir_inode);
> + if (it == _inodes.end()) {
> + return now(); // Directory disappeared
> + }
> + if (not it->second.is_directory()) {
> + return make_exception_future(path_component_not_directory_exception());
> + }
> +
> + return repeat([this, dir_inode, &prev_entry, &func] {
> + auto it = _inodes.find(dir_inode);
> + if (it == _inodes.end()) {
> + return make_ready_future<stop_iteration>(stop_iteration::yes); // Directory disappeared
> + }
> + assert(it->second.is_directory() and "Directory cannot become a file");
> + auto& dir = it->second.get_directory();
> +
> + auto entry_it = dir.entries.upper_bound(prev_entry);
> + if (entry_it == dir.entries.end()) {
> + return make_ready_future<stop_iteration>(stop_iteration::yes); // No more entries
> + }
> +
> + prev_entry = entry_it->first;
> + return func(static_cast<const std::string&>(prev_entry));
Why the cast?
> + });
> + });
> + });
> + }
> +
> + // Returns size of the file or throws exception iff @p inode is invalid
> + file_offset_t file_size(inode_t inode) const;
> +
> + // All disk-related errors will be exposed here
> + future<> flush_log() {
> + return flush_curr_cluster();
> + }
> +};
> +
> +} // namespace seastar::fs
> diff --git a/src/fs/metadata_log_bootstrap.hh b/src/fs/metadata_log_bootstrap.hh
> new file mode 100644
> index 00000000..5da79631
> --- /dev/null
> +++ b/src/fs/metadata_log_bootstrap.hh
> @@ -0,0 +1,123 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2020 ScyllaDB
> + */
> +
> +#pragma once
> +
> +#include "fs/bitwise.hh"
> +#include "fs/cluster.hh"
> +#include "fs/inode.hh"
> +#include "fs/inode_info.hh"
> +#include "fs/metadata_disk_entries.hh"
> +#include "fs/metadata_to_disk_buffer.hh"
> +#include "fs/units.hh"
> +#include "fs/metadata_log.hh"
> +#include "seastar/core/do_with.hh"
> +#include "seastar/core/future-util.hh"
> +#include "seastar/core/future.hh"
> +#include "seastar/core/temporary_buffer.hh"
> +
> +#include <boost/crc.hpp>
> +#include <cstddef>
> +#include <cstring>
> +#include <unordered_set>
> +#include <variant>
> +
> +namespace seastar::fs {
> +
> +// TODO: add a comment about what it is
> +class data_reader {
> + const uint8_t* _data = nullptr;
> + size_t _size = 0;
> + size_t _pos = 0;
> + size_t _last_checkpointed_pos = 0;
> +
> +public:
> + data_reader() = default;
> +
> + data_reader(const uint8_t* data, size_t size) : _data(data), _size(size) {}
> +
> + size_t curr_pos() const noexcept { return _pos; }
> +
> + size_t last_checkpointed_pos() const noexcept { return _last_checkpointed_pos; }
> +
> + size_t bytes_left() const noexcept { return _size - _pos; }
> +
> + void align_curr_pos(size_t alignment) noexcept { _pos = round_up_to_multiple_of_power_of_2(_pos, alignment); }
> +
> + void checkpoint_curr_pos() noexcept { _last_checkpointed_pos = _pos; }
> +
> + // Returns whether the reading was successful
> + bool read(void* destination, size_t size);
> +
> + // Returns whether the reading was successful
> + template<class T>
> + bool read_entry(T& entry) noexcept {
> + return read(&entry, sizeof(entry));
> + }
> +
> + // Returns whether the reading was successful
> + bool read_string(std::string& str, size_t size);
> +
> + std::optional<temporary_buffer<uint8_t>> read_tmp_buff(size_t size);
> +
> + // Returns whether the processing was successful
> + bool process_crc_without_reading(boost::crc_32_type& crc, size_t size);
> +
> + std::optional<data_reader> extract(size_t size);
> +};
> +
> +class metadata_log_bootstrap {
> + metadata_log& _metadata_log;
> + cluster_range _available_clusters;
> + std::unordered_set<cluster_id_t> _taken_clusters;
> + std::optional<cluster_id_t> _next_cluster;
> + temporary_buffer<uint8_t> _curr_cluster_data;
> + data_reader _curr_cluster;
> + data_reader _curr_checkpoint;
> +
> + metadata_log_bootstrap(metadata_log& metadata_log, cluster_range available_clusters);
> +
> + future<> bootstrap(cluster_id_t first_metadata_cluster_id, fs_shard_id_t fs_shards_pool_size,
> + fs_shard_id_t fs_shard_id);
> +
> + future<> bootstrap_cluster(cluster_id_t curr_cluster);
> +
> + static auto invalid_entry_exception() {
> + return make_exception_future<>(std::runtime_error("Invalid metadata log entry"));
> + }
> +
> + future<> bootstrap_read_cluster();
> +
> + // Returns whether reading and checking was successful
> + bool read_and_check_checkpoint();
> +
> + future<> bootstrap_checkpointed_data();
> +
> + future<> bootstrap_next_metadata_cluster();
> +
> + bool inode_exists(inode_t inode);
> +
> +public:
> + static future<> bootstrap(metadata_log& metadata_log, inode_t root_dir, cluster_id_t first_metadata_cluster_id,
> + cluster_range available_clusters, fs_shard_id_t fs_shards_pool_size, fs_shard_id_t fs_shard_id);
> +};
> +
> +} // namespace seastar::fs
> diff --git a/src/fs/metadata_to_disk_buffer.hh b/src/fs/metadata_to_disk_buffer.hh
> new file mode 100644
> index 00000000..bd60f4f3
> --- /dev/null
> +++ b/src/fs/metadata_to_disk_buffer.hh
> @@ -0,0 +1,158 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2019 ScyllaDB
> + */
> +
> +#pragma once
> +
> +#include "fs/bitwise.hh"
> +#include "fs/metadata_disk_entries.hh"
> +#include "fs/to_disk_buffer.hh"
> +
> +#include <boost/crc.hpp>
> +
> +namespace seastar::fs {
> +
> +// Represents buffer that will be written to a block_device. Method init() should be called just after constructor
> +// in order to finish construction.
> +class metadata_to_disk_buffer : protected to_disk_buffer {
> + boost::crc_32_type _crc;
Please use crc32c which has hardware implementations. Later, we can
contribute the optimized implementation from Scylla.
> +
> +public:
> + metadata_to_disk_buffer() = default;
> +
> + using to_disk_buffer::init; // Explicitly stated that stays the same
> +
> + virtual shared_ptr<metadata_to_disk_buffer> virtual_constructor() const {
> + return make_shared<metadata_to_disk_buffer>();
> + }
What's this?
> +
> + /**
> + * @brief Inits object, leaving it in state as if just after flushing with unflushed data end at
> + * @p cluster_beg_offset
> + *
> + * @param aligned_max_size size of the buffer, must be aligned
> + * @param alignment write alignment
> + * @param cluster_beg_offset disk offset of the beginning of the cluster
> + * @param metadata_end_pos position at which valid metadata ends: valid metadata range: [0, @p metadata_end_pos)
> + */
> + virtual void init_from_bootstrapped_cluster(size_t aligned_max_size, unit_size_t alignment,
> + disk_offset_t cluster_beg_offset, size_t metadata_end_pos) {
> + assert(is_power_of_2(alignment));
> + assert(mod_by_power_of_2(aligned_max_size, alignment) == 0);
> + assert(mod_by_power_of_2(cluster_beg_offset, alignment) == 0);
> + assert(aligned_max_size >= sizeof(ondisk_type) + sizeof(ondisk_checkpoint));
> + assert(alignment >= sizeof(ondisk_type) + sizeof(ondisk_checkpoint) + sizeof(ondisk_type) +
> + sizeof(ondisk_next_metadata_cluster) and
> + "We always need to be able to pack at least a checkpoint and next_metadata_cluster entry to the last "
> + "data flush in the cluster");
> + assert(metadata_end_pos < aligned_max_size);
> +
> + _max_size = aligned_max_size;
> + _alignment = alignment;
> + _cluster_beg_offset = cluster_beg_offset;
> + auto aligned_pos = round_up_to_multiple_of_power_of_2(metadata_end_pos, _alignment);
> + _unflushed_data = {aligned_pos, aligned_pos};
> + _buff = decltype(_buff)::aligned(_alignment, _max_size);
> +
> + start_new_unflushed_data();
> + }
> +
> +protected:
> + void start_new_unflushed_data() noexcept override {
> + if (bytes_left() < sizeof(ondisk_type) + sizeof(ondisk_checkpoint) + sizeof(ondisk_type) +
> + sizeof(ondisk_next_metadata_cluster)) {
> + assert(bytes_left() == 0); // alignment has to be big enough to hold checkpoint and next_metadata_cluster
> + return; // No more space
> + }
> +
> + ondisk_type type = INVALID;
> + ondisk_checkpoint checkpoint;
> + std::memset(&checkpoint, 0, sizeof(checkpoint));
> +
> + to_disk_buffer::append_bytes(&type, sizeof(type));
> + to_disk_buffer::append_bytes(&checkpoint, sizeof(checkpoint));
> +
> + _crc.reset();
> + }
> +
> + void prepare_unflushed_data_for_flush() noexcept override {
> + // Make checkpoint valid
> + constexpr ondisk_type checkpoint_type = CHECKPOINT;
> + size_t checkpoint_pos = _unflushed_data.beg + sizeof(checkpoint_type);
> + ondisk_checkpoint checkpoint;
> + checkpoint.checkpointed_data_length = _unflushed_data.end - checkpoint_pos - sizeof(checkpoint);
> + _crc.process_bytes(&checkpoint.checkpointed_data_length, sizeof(checkpoint.checkpointed_data_length));
> + checkpoint.crc32_code = _crc.checksum();
> +
> + std::memcpy(_buff.get_write() + _unflushed_data.beg, &checkpoint_type, sizeof(checkpoint_type));
> + std::memcpy(_buff.get_write() + checkpoint_pos, &checkpoint, sizeof(checkpoint));
> + }
> +
> +public:
> + using to_disk_buffer::bytes_left_after_flush_if_done_now; // Explicitly stated that stays the same
> +
> +private:
> + void append_bytes(const void* data, size_t len) noexcept override {
> + to_disk_buffer::append_bytes(data, len);
> + _crc.process_bytes(data, len);
> + }
> +
> +public:
> + enum append_result {
> + APPENDED,
> + TOO_BIG,
> + };
> +
> + [[nodiscard]] virtual append_result append(const ondisk_next_metadata_cluster& next_metadata_cluster) noexcept {
> + ondisk_type type = NEXT_METADATA_CLUSTER;
> + if (bytes_left() < ondisk_entry_size(next_metadata_cluster)) {
> + return TOO_BIG;
> + }
> +
> + append_bytes(&type, sizeof(type));
> + append_bytes(&next_metadata_cluster, sizeof(next_metadata_cluster));
> + return APPENDED;
> + }
> +
> + using to_disk_buffer::bytes_left;
> +
> +protected:
> + bool fits_for_append(size_t bytes_no) const noexcept {
> + // We need to reserve space for the next metadata cluster entry
> + return (bytes_left() >= bytes_no + sizeof(ondisk_type) + sizeof(ondisk_next_metadata_cluster));
> + }
> +
> +private:
> + template<class T>
> + [[nodiscard]] append_result append_simple(ondisk_type type, const T& entry) noexcept {
> + if (not fits_for_append(ondisk_entry_size(entry))) {
> + return TOO_BIG;
> + }
> +
> + append_bytes(&type, sizeof(type));
> + append_bytes(&entry, sizeof(entry));
> + return APPENDED;
> + }
> +
> +public:
> + using to_disk_buffer::flush_to_disk;
> +};
> +
> +} // namespace seastar::fs
> diff --git a/src/fs/to_disk_buffer.hh b/src/fs/to_disk_buffer.hh
> new file mode 100644
> index 00000000..612f26d2
> --- /dev/null
> +++ b/src/fs/to_disk_buffer.hh
> @@ -0,0 +1,138 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2019 ScyllaDB
> + */
> +
> +#pragma once
> +
> +#include "fs/bitwise.hh"
> +#include "fs/units.hh"
> +#include "seastar/core/future.hh"
> +#include "seastar/core/temporary_buffer.hh"
> +#include "seastar/fs/block_device.hh"
> +
> +#include <cstring>
> +
> +namespace seastar::fs {
> +
> +// Represents buffer that will be written to a block_device. Method init() should be called just after constructor
> +// in order to finish construction.
> +class to_disk_buffer {
Please split this into a separate patch. It's big enough and it's hard
to understand the preceding code without it.
> +protected:
> + temporary_buffer<uint8_t> _buff;
> + size_t _max_size = 0;
> + unit_size_t _alignment = 0;
> + disk_offset_t _cluster_beg_offset = 0; // disk offset that corresponds to _buff.begin()
> + range<size_t> _unflushed_data = {0, 0}; // range of unflushed bytes in _buff
> +
> +public:
> + to_disk_buffer() = default;
> +
> + to_disk_buffer(const to_disk_buffer&) = delete;
> + to_disk_buffer& operator=(const to_disk_buffer&) = delete;
> + to_disk_buffer(to_disk_buffer&&) = default;
> + to_disk_buffer& operator=(to_disk_buffer&&) = default;
> +
> + // Total number of bytes appended cannot exceed @p aligned_max_size.
> + // @p cluster_beg_offset is the disk offset of the beginning of the cluster.
> + virtual void init(size_t aligned_max_size, unit_size_t alignment, disk_offset_t cluster_beg_offset) {
> + assert(is_power_of_2(alignment));
> + assert(mod_by_power_of_2(aligned_max_size, alignment) == 0);
> + assert(mod_by_power_of_2(cluster_beg_offset, alignment) == 0);
> +
> + _max_size = aligned_max_size;
> + _alignment = alignment;
> + _cluster_beg_offset = cluster_beg_offset;
> + _unflushed_data = {0, 0};
> + _buff = decltype(_buff)::aligned(_alignment, _max_size);
> + start_new_unflushed_data();
> + }
> +
Why is this virtual? I'm missing some theory of operation.
> + virtual ~to_disk_buffer() = default;
> +
> + /**
> + * @brief Writes buffered (unflushed) data to disk and starts a new unflushed data if there is enough space
> + * IMPORTANT: using this buffer before call to flush_to_disk() completes is perfectly OK
> + * @details After each flush we align the offset at which the new unflushed data is continued. This is very
> + * important, as it ensures that consecutive flushes, as their underlying write operations to a block device,
> + * do not overlap. If the writes overlapped, it would be possible that they would be written in the reverse order
> + * corrupting the on-disk data.
> + *
> + * @param device output device
> + */
> + virtual future<> flush_to_disk(block_device device) {
> + prepare_unflushed_data_for_flush();
> + // Data layout overview:
> + // |.........................|00000000000000000000000|
> + // ^ _unflushed_data.beg ^ _unflushed_data.end ^ real_write.end
> + // (aligned) (maybe unaligned) (aligned)
> + // == real_write.beg == new _unflushed_data.beg
> + // |<------ padding ------>|
> + assert(mod_by_power_of_2(_unflushed_data.beg, _alignment) == 0);
> + range real_write = {
> + _unflushed_data.beg,
> + round_up_to_multiple_of_power_of_2(_unflushed_data.end, _alignment),
> + };
> + // Pad buffer with zeros till alignment
> + range padding = {_unflushed_data.end, real_write.end};
> + std::memset(_buff.get_write() + padding.beg, 0, padding.size());
> +
> + // Make sure the buffer is usable before returning from this function
> + _unflushed_data = {real_write.end, real_write.end};
> + start_new_unflushed_data();
> +
> + return device.write(_cluster_beg_offset + real_write.beg, _buff.get_write() + real_write.beg, real_write.size())
> + .then([real_write](size_t written_bytes) {
> + if (written_bytes != real_write.size()) {
> + return make_exception_future<>(std::runtime_error("Partial write"));
> + // TODO: maybe add some way to retry write, because once the buffer is corrupt nothing can be done now
> + }
> +
> + return now();
> + });
> + }
> +
> +protected:
> + // May be called before the flushing previous fragment is
> + virtual void start_new_unflushed_data() noexcept {}
> +
> + virtual void prepare_unflushed_data_for_flush() noexcept {}
> +
> +public:
> + virtual void append_bytes(const void* data, size_t len) noexcept {
> + assert(len <= bytes_left());
> + std::memcpy(_buff.get_write() + _unflushed_data.end, data, len);
> + _unflushed_data.end += len;
> + }
> +
> + // Returns maximum number of bytes that may be written to buffer without calling reset()
> + virtual size_t bytes_left() const noexcept { return _max_size - _unflushed_data.end; }
> +
It looks wrong to have such a simple function virtual, but I admit I'm
lost here.
> + virtual size_t bytes_left_after_flush_if_done_now() const noexcept {
> + return _max_size - round_up_to_multiple_of_power_of_2(_unflushed_data.end, _alignment);
> + }
> +
> + // Returns disk offset of the place where the first byte of next appended bytes would be after flush
> + // TODO: maybe better name for that function? Or any other method to extract that data?
> + virtual disk_offset_t current_disk_offset() const noexcept {
> + return _cluster_beg_offset + _unflushed_data.end;
> + }
> +};
> +
> +} // namespace seastar::fs
> diff --git a/src/fs/unix_metadata.hh b/src/fs/unix_metadata.hh
> new file mode 100644
> index 00000000..6f634044
> --- /dev/null
> +++ b/src/fs/unix_metadata.hh
> @@ -0,0 +1,40 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2019 ScyllaDB
> + */
> +
> +#pragma once
> +
> +#include "seastar/core/file-types.hh"
> +
> +#include <cstdint>
> +#include <sys/types.h>
> +
> +namespace seastar::fs {
> +
> +struct unix_metadata {
> + file_permissions perms;
> + uid_t uid;
> + gid_t gid;
If these go do disk, they should have an implementation-independent
definition, and the thing should be packed.
> + uint64_t btime_ns;
> + uint64_t mtime_ns;
> + uint64_t ctime_ns;
> +};
> +
This can also go into its own patch.
> +} // namespace seastar::fs
> diff --git a/src/fs/metadata_log.cc b/src/fs/metadata_log.cc
> new file mode 100644
> index 00000000..6e29f2e5
> --- /dev/null
> +++ b/src/fs/metadata_log.cc
> @@ -0,0 +1,222 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2020 ScyllaDB
> + */
> +
> +#include "fs/cluster.hh"
> +#include "fs/cluster_allocator.hh"
> +#include "fs/inode.hh"
> +#include "fs/inode_info.hh"
> +#include "fs/metadata_disk_entries.hh"
> +#include "fs/metadata_log.hh"
> +#include "fs/metadata_log_bootstrap.hh"
> +#include "fs/metadata_to_disk_buffer.hh"
> +#include "fs/path.hh"
> +#include "fs/units.hh"
> +#include "fs/unix_metadata.hh"
> +#include "seastar/core/aligned_buffer.hh"
> +#include "seastar/core/do_with.hh"
> +#include "seastar/core/file-types.hh"
> +#include "seastar/core/future-util.hh"
> +#include "seastar/core/future.hh"
> +#include "seastar/core/shared_mutex.hh"
> +#include "seastar/fs/overloaded.hh"
> +
> +#include <boost/crc.hpp>
> +#include <boost/range/irange.hpp>
> +#include <chrono>
> +#include <cstddef>
> +#include <cstdint>
> +#include <cstring>
> +#include <limits>
> +#include <stdexcept>
> +#include <string_view>
> +#include <unordered_set>
> +#include <variant>
> +
> +namespace seastar::fs {
> +
> +metadata_log::metadata_log(block_device device, uint32_t cluster_size, uint32_t alignment,
> + shared_ptr<metadata_to_disk_buffer> cluster_buff)
> +: _device(std::move(device))
> +, _cluster_size(cluster_size)
> +, _alignment(alignment)
> +, _curr_cluster_buff(std::move(cluster_buff))
> +, _cluster_allocator({}, {})
> +, _inode_allocator(1, 0) {
> +
We generally double-indent such.
> assert(is_power_of_2(alignment));
> + assert(cluster_size > 0 and cluster_size % alignment == 0);
> +}
> +
> +metadata_log::metadata_log(block_device device, unit_size_t cluster_size, unit_size_t alignment)
> +: metadata_log(std::move(device), cluster_size, alignment,
> + make_shared<metadata_to_disk_buffer>()) {}
> +
> +future<> metadata_log::bootstrap(inode_t root_dir, cluster_id_t first_metadata_cluster_id, cluster_range available_clusters,
> + fs_shard_id_t fs_shards_pool_size, fs_shard_id_t fs_shard_id) {
> + return metadata_log_bootstrap::bootstrap(*this, root_dir, first_metadata_cluster_id, available_clusters,
> + fs_shards_pool_size, fs_shard_id);
> +}
> +
> +future<> metadata_log::shutdown() {
> + return flush_log().then([this] {
> + return _device.close();
> + });
> +}
> +
> +void metadata_log::schedule_flush_of_curr_cluster() {
> + // Make writes concurrent (TODO: maybe serialized within *one* cluster would be faster?)
> + schedule_background_task(do_with(_curr_cluster_buff, &_device, [](auto& crr_clstr_bf, auto& device) {
> + return crr_clstr_bf->flush_to_disk(*device);
> + }));
> +}
> +
> +future<> metadata_log::flush_curr_cluster() {
> + if (_curr_cluster_buff->bytes_left_after_flush_if_done_now() == 0) {
> + switch (schedule_flush_of_curr_cluster_and_change_it_to_new_one()) {
> + case flush_result::NO_SPACE:
> + return make_exception_future(no_more_space_exception());
> + case flush_result::DONE:
> + break;
> + }
> + } else {
> + schedule_flush_of_curr_cluster();
> + }
> +
> + return _background_futures.get_future();
> +}
> +
> +metadata_log::flush_result metadata_log::schedule_flush_of_curr_cluster_and_change_it_to_new_one() {
> + auto next_cluster = _cluster_allocator.alloc();
> + if (not next_cluster) {
> + // Here metadata log dies, we cannot even flush current cluster because from there we won't be able to recover
> + // TODO: ^ add protection from it and take it into account during compaction
Yes, we need two thresholds for allocation: data cluster allocations
should fail early, with a reserve that is enough for compaction for
metadata clusters. Perhaps we need three thresholds, one for data
clusters, one for new metadata clusters (allowing file deletions only),
and one for metadata log compation (allowing allocating the very last
cluster on disk).
> + return flush_result::NO_SPACE;
> + }
> +
> + auto append_res = _curr_cluster_buff->append(ondisk_next_metadata_cluster {*next_cluster});
> + assert(append_res == metadata_to_disk_buffer::APPENDED);
> + schedule_flush_of_curr_cluster();
> +
> + // Make next cluster the current cluster to allow writing next metadata entries before flushing finishes
> + _curr_cluster_buff->virtual_constructor();
> + _curr_cluster_buff->init(_cluster_size, _alignment,
> + cluster_id_to_offset(*next_cluster, _cluster_size));
> + return flush_result::DONE;
> +}
> +
> +std::variant<inode_t, metadata_log::path_lookup_error> metadata_log::do_path_lookup(const std::string& path) const noexcept {
Maybe the in-memory representation should be split from the metadata
log, they are both liable to grow.
> + if (path.empty() or path[0] != '/') {
> + return path_lookup_error::NOT_ABSOLUTE;
> + }
> +
> + std::vector<inode_t> components_stack = {_root_dir};
> + size_t beg = 0;
> + while (beg < path.size()) {
> + range component_range = {beg, path.find('/', beg)};
> + bool check_if_dir = false;
> + if (component_range.end == path.npos) {
> + component_range.end = path.size();
> + beg = path.size();
> + } else {
> + check_if_dir = true;
> + beg = component_range.end + 1; // Jump over '/'
Does this deal with "a////b"?
> + }
> +
> + std::string_view component(path.data() + component_range.beg, component_range.size());
> + // Process the component
> + if (component == "") {
> + continue;
> + } else if (component == ".") {
> + assert(component_range.beg > 0 and path[component_range.beg - 1] == '/' and "Since path is absolute we do not have to check if the current component is a directory");
> + continue;
> + } else if (component == "..") {
> + if (components_stack.size() > 1) { // Root dir cannot be popped
> + components_stack.pop_back();
> + }
> + } else {
> + auto dir_it = _inodes.find(components_stack.back());
> + assert(dir_it != _inodes.end() and "inode comes from some previous lookup (or is a root directory) hence dir_it has to be valid");
> + assert(dir_it->second.is_directory() and "every previous component is a directory and it was checked when they were processed");
> + auto& curr_dir = dir_it->second.get_directory();
> +
> + auto it = curr_dir.entries.find(component);
> + if (it == curr_dir.entries.end()) {
> + return path_lookup_error::NO_ENTRY;
> + }
> +
> + inode_t entry_inode = it->second;
> + if (check_if_dir) {
> + auto entry_it = _inodes.find(entry_inode);
> + assert(entry_it != _inodes.end() and "dir entries have to exist");
> + if (not entry_it->second.is_directory()) {
> + return path_lookup_error::NOT_DIR;
> + }
> + }
> +
> + components_stack.emplace_back(entry_inode);
> + }
> + }
> +
> + return components_stack.back();
> +}
> +
> +future<inode_t> metadata_log::path_lookup(const std::string& path) const {
> + auto lookup_res = do_path_lookup(path);
> + return std::visit(overloaded {
> + [](path_lookup_error error) {
> + switch (error) {
> + case path_lookup_error::NOT_ABSOLUTE:
> + return make_exception_future<inode_t>(path_is_not_absolute_exception());
> + case path_lookup_error::NO_ENTRY:
> + return make_exception_future<inode_t>(no_such_file_or_directory_exception());
> + case path_lookup_error::NOT_DIR:
> + return make_exception_future<inode_t>(path_component_not_directory_exception());
> + }
> + __builtin_unreachable();
> + },
> + [](inode_t inode) {
> + return make_ready_future<inode_t>(inode);
> + }
> + }, lookup_res);
> +}
> +
> +file_offset_t metadata_log::file_size(inode_t inode) const {
> + auto it = _inodes.find(inode);
> + if (it == _inodes.end()) {
> + throw invalid_inode_exception();
> + }
> +
> + return std::visit(overloaded {
> + [](const inode_info::file& file) {
> + return file.size();
> + },
> + [](const inode_info::directory&) -> file_offset_t {
> + throw invalid_inode_exception();
You can return the number of entries in a directory. ls -l doesn't throw
when one of the entries is a directory.
> + }
> + }, it->second.contents);
> +}
> +
> +// TODO: think about how to make filesystem recoverable from ENOSPACE situation: flush() (or something else) throws ENOSPACE,
> +// then it should be possible to compact some data (e.g. by truncating a file) via top-level interface and retrying the flush()
> +// without a ENOSPACE error. In particular if we delete all files after ENOSPACE it should be successful. It becomes especially
> +// hard if we write metadata to the last cluster and there is no enough room to write these delete operations. We have to
> +// guarantee that the filesystem is in a recoverable state then.
Yes, I posted some ideas earlier.
> +
> +} // namespace seastar::fs
> diff --git a/src/fs/metadata_log_bootstrap.cc b/src/fs/metadata_log_bootstrap.cc
> new file mode 100644
> index 00000000..926d79fe
> --- /dev/null
> +++ b/src/fs/metadata_log_bootstrap.cc
> @@ -0,0 +1,264 @@
> +/*
> + * This file is open source software, licensed to you under the terms
> + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
> + * distributed with this work for additional information regarding copyright
> + * ownership. You may not use this file except in compliance with the License.
> + *
> + * You may obtain a copy of the License at
> + *
> + *
http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied. See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +/*
> + * Copyright (C) 2020 ScyllaDB
> + */
> +
> +#include "fs/bitwise.hh"
> +#include "fs/metadata_disk_entries.hh"
> +#include "fs/metadata_log_bootstrap.hh"
> +#include "seastar/util/log.hh"
> +
> +namespace seastar::fs {
> +
> +namespace {
> +logger mlogger("fs_metadata_bootstrap");
> +} // namespace
> +
> +bool data_reader::read(void* destination, size_t size) {
> + if (_pos + size > _size) {
> + return false;
> + }
> +
> + std::memcpy(destination, _data + _pos, size);
> + _pos += size;
> + return true;
> +}
> +
> +bool data_reader::read_string(std::string& str, size_t size) {
> + str.resize(size);
> + return read(str.data(), size);
> +}
> +
> +std::optional<temporary_buffer<uint8_t>> data_reader::read_tmp_buff(size_t size) {
> + if (_pos + size > _size) {
> + return std::nullopt;
> + }
> +
> + _pos += size;
> + return temporary_buffer<uint8_t>(_data + _pos - size, size);
> +}
> +
> +bool data_reader::process_crc_without_reading(boost::crc_32_type& crc, size_t size) {
> + if (_pos + size > _size) {
> + return false;
> + }
> +
> + crc.process_bytes(_data + _pos, size);
> + return true;
> +}
> +
> +std::optional<data_reader> data_reader::extract(size_t size) {
> + if (_pos + size > _size) {
> + return std::nullopt;
> + }
> +
> + _pos += size;
> + return data_reader(_data + _pos - size, size);
> +}
> +
> +metadata_log_bootstrap::metadata_log_bootstrap(metadata_log& metadata_log, cluster_range available_clusters)
> +: _metadata_log(metadata_log)
> +, _available_clusters(available_clusters)
> +, _curr_cluster_data(decltype(_curr_cluster_data)::aligned(metadata_log._alignment, metadata_log._cluster_size))
> +{}
> +
> +future<> metadata_log_bootstrap::bootstrap(cluster_id_t first_metadata_cluster_id, fs_shard_id_t fs_shards_pool_size,
> + fs_shard_id_t fs_shard_id) {
> + _next_cluster = first_metadata_cluster_id;
> + mlogger.debug(">>>> Started bootstraping <<<<");
> + return do_with((cluster_id_t)first_metadata_cluster_id, [this](cluster_id_t& last_cluster) {
> + return do_until([this] { return not _next_cluster.has_value(); }, [this, &last_cluster] {
> + cluster_id_t curr_cluster = *_next_cluster;
> + _next_cluster = std::nullopt;
> + bool inserted = _taken_clusters.emplace(curr_cluster).second;
> + assert(inserted); // TODO: check it in next_cluster record
> + last_cluster = curr_cluster;
> + return bootstrap_cluster(curr_cluster);
> + }).then([this, &last_cluster] {
> + mlogger.debug("Data bootstraping is done");
> + // Initialize _curr_cluster_buff
> + _metadata_log._curr_cluster_buff = _metadata_log._curr_cluster_buff->virtual_constructor();
> + mlogger.debug("Initializing _curr_cluster_buff: cluster {}, pos {}", last_cluster, _curr_cluster.last_checkpointed_pos());
> + _metadata_log._curr_cluster_buff->init_from_bootstrapped_cluster(_metadata_log._cluster_size,
> + _metadata_log._alignment, cluster_id_to_offset(last_cluster, _metadata_log._cluster_size),
> + _curr_cluster.last_checkpointed_pos());
> + });
> + }).then([this, fs_shards_pool_size, fs_shard_id] {
> + // Initialize _cluser_allocator
> + mlogger.debug("Initializing cluster allocator");
> + std::deque<cluster_id_t> free_clusters;
> + for (auto cid : boost::irange(_available_clusters.beg, _available_clusters.end)) {
> + if (_taken_clusters.count(cid) == 0) {
> + free_clusters.emplace_back(cid);
> + }
> + }
> + if (free_clusters.empty()) {
> + return make_exception_future(no_more_space_exception());
> + }
> + free_clusters.pop_front();
> +
> + mlogger.debug("free clusters: {}", free_clusters.size());
> + _metadata_log._cluster_allocator = cluster_allocator(std::move(_taken_clusters), std::move(free_clusters));
> +
> + // Reset _inode_allocator
> + std::optional<inode_t> max_inode_no;
> + if (not _metadata_log._inodes.empty()) {
> + max_inode_no =_metadata_log._inodes.rbegin()->first;
> + }
> + _metadata_log._inode_allocator = shard_inode_allocator(fs_shards_pool_size, fs_shard_id, max_inode_no);
> +
> + // TODO: what about orphaned inodes: maybe they are remnants of unlinked files and we need to delete them,
> + // or maybe not?
> + return now();
> + });
You can use a coroutine for this, I don't mind making it C++20-only. It
will be approximately 83,223.772 times more readable.
> +}
> +
> +future<> metadata_log_bootstrap::bootstrap_cluster(cluster_id_t curr_cluster) {
> + disk_offset_t curr_cluster_disk_offset = cluster_id_to_offset(curr_cluster, _metadata_log._cluster_size);
> + mlogger.debug("Bootstraping from cluster {}...", curr_cluster);
> + return _metadata_log._device.read(curr_cluster_disk_offset, _curr_cluster_data.get_write(),
> + _metadata_log._cluster_size).then([this, curr_cluster](size_t bytes_read) {
> + if (bytes_read != _metadata_log._cluster_size) {
> + return make_exception_future(std::runtime_error("Failed to read whole cluster of the metadata log"));
> + }
> +
> + mlogger.debug("Read cluster {}", curr_cluster);
> + _curr_cluster = data_reader(_curr_cluster_data.get(), _metadata_log._cluster_size);
> + return bootstrap_read_cluster();
> + });
> +}
> +
> +future<> metadata_log_bootstrap::bootstrap_read_cluster() {
> + // Process cluster: the data layout format is:
> + // | checkpoint1 | data1... | checkpoint2 | data2... | ... |
> + return do_with(false, [this](bool& whole_log_ended) {
> + return do_until([this, &whole_log_ended] { return whole_log_ended or _next_cluster.has_value(); },
> + [this, &whole_log_ended] {
> + _curr_cluster.align_curr_pos(_metadata_log._alignment);
> + _curr_cluster.checkpoint_curr_pos();
> +
> + if (not read_and_check_checkpoint()) {
> + mlogger.debug("Checkpoint invalid");
> + whole_log_ended = true;
> + return now();
> + }
> +
> + mlogger.debug("Checkpoint correct");
> + return bootstrap_checkpointed_data();
> + }).then([] {
> + mlogger.debug("Cluster ended");
> + });
> + });
> +}
> +
> +bool metadata_log_bootstrap::read_and_check_checkpoint() {
> + mlogger.debug("Processing checkpoint at {}", _curr_cluster.curr_pos());
> + ondisk_type entry_type;
> + ondisk_checkpoint checkpoint;
> + if (not _curr_cluster.read_entry(entry_type)) {
> + mlogger.debug("Cannot read entry type");
> + return false;
> + }
> + if (entry_type != CHECKPOINT) {
> + mlogger.debug("Entry type (= {}) is not CHECKPOINT (= {})", entry_type, CHECKPOINT);
> + return false;
> + }
> + if (not _curr_cluster.read_entry(checkpoint)) {
> + mlogger.debug("Cannot read checkpoint entry");
> + return false;
> + }
> +
> + boost::crc_32_type crc;
> + if (not _curr_cluster.process_crc_without_reading(crc, checkpoint.checkpointed_data_length)) {
> + mlogger.debug("Invalid checkpoint's data length: {}", (unit_size_t)checkpoint.checkpointed_data_length);
> + return false;
> + }
> + crc.process_bytes(&checkpoint.checkpointed_data_length, sizeof(checkpoint.checkpointed_data_length));
> + if (crc.checksum() != checkpoint.crc32_code) {
> + mlogger.debug("CRC code does not match: computed = {}, read = {}", crc.checksum(), (uint32_t)checkpoint.crc32_code);
> + return false;
> + }
> +
> + auto opt = _curr_cluster.extract(checkpoint.checkpointed_data_length);
> + assert(opt.has_value());
> + _curr_checkpoint = *opt;
> + return true;
> +}
> +
> +future<> metadata_log_bootstrap::bootstrap_checkpointed_data() {
> + return do_with(ondisk_type {}, [this](ondisk_type& entry_type) {
> + return do_until([this, &entry_type] { return not _curr_checkpoint.read_entry(entry_type); },
> + [this, &entry_type] {
> + switch (entry_type) {
> + case INVALID:
> + case CHECKPOINT: // CHECKPOINT cannot appear as part of checkpointed data
> + return invalid_entry_exception();
> + case NEXT_METADATA_CLUSTER:
> + return bootstrap_next_metadata_cluster();
> + }
> +
> + // unknown type => metadata log corruption
> + return invalid_entry_exception();
> + }).then([this] {
> + if (_curr_checkpoint.bytes_left() > 0) {
> + return invalid_entry_exception(); // Corrupted checkpointed data
> + }
> + return now();
> + });
> + });
> +}
> +
> +future<> metadata_log_bootstrap::bootstrap_next_metadata_cluster() {
> + ondisk_next_metadata_cluster entry;
> + if (not _curr_checkpoint.read_entry(entry)) {
> + return invalid_entry_exception();
> + }
> +
> + if (_next_cluster.has_value()) {
> + return invalid_entry_exception(); // Only one NEXT_METADATA_CLUSTER may appear in one cluster
> + }
> +
> + _next_cluster = (cluster_id_t)entry.cluster_id;
> + return now();
> +}
> +
> +bool metadata_log_bootstrap::inode_exists(inode_t inode) {
> + return _metadata_log._inodes.count(inode) != 0;
> +}
> +
> +future<> metadata_log_bootstrap::bootstrap(metadata_log& metadata_log, inode_t root_dir, cluster_id_t first_metadata_cluster_id,
> + cluster_range available_clusters, fs_shard_id_t fs_shards_pool_size, fs_shard_id_t fs_shard_id) {
> + // Clear the metadata log
> + metadata_log._inodes.clear();
> + metadata_log._background_futures = now();
> + metadata_log._root_dir = root_dir;
> + metadata_log._inodes.emplace(root_dir, inode_info {
> + 0,
> + 0,
> + {}, // TODO: change it to something meaningful
> + inode_info::directory {}
> + });
> +
> + return do_with(metadata_log_bootstrap(metadata_log, available_clusters),
> + [first_metadata_cluster_id, fs_shards_pool_size, fs_shard_id](metadata_log_bootstrap& bootstrap) {
> + return bootstrap.bootstrap(first_metadata_cluster_id, fs_shards_pool_size, fs_shard_id);
> + });
> +}
> +
> +} // namespace seastar::fs
> diff --git a/CMakeLists.txt b/CMakeLists.txt
> index 8a59eca6..19666a8a 100644
> --- a/CMakeLists.txt
> +++ b/CMakeLists.txt
> @@ -658,6 +658,7 @@ if (Seastar_EXPERIMENTAL_FS)
> PRIVATE
> # SeastarFS source files
> include/seastar/fs/block_device.hh
> + include/seastar/fs/exceptions.hh
> include/seastar/fs/file.hh
> include/seastar/fs/overloaded.hh
> include/seastar/fs/temporary_file.hh
> @@ -670,9 +671,18 @@ if (Seastar_EXPERIMENTAL_FS)
> src/fs/crc.hh
> src/fs/file.cc
> src/fs/inode.hh
> + src/fs/inode_info.hh
> + src/fs/metadata_disk_entries.hh
> + src/fs/metadata_log.cc
> + src/fs/metadata_log.hh
> + src/fs/metadata_log_bootstrap.cc
> + src/fs/metadata_log_bootstrap.hh
> + src/fs/metadata_to_disk_buffer.hh
> src/fs/path.hh
> src/fs/range.hh
> + src/fs/to_disk_buffer.hh
> src/fs/units.hh
> + src/fs/unix_metadata.hh
> src/fs/value_shared_lock.hh
> )
> endif()