[COMMIT seastar master] file: Rework read interface

7 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
May 26, 2015, 8:19:47 AM5/26/15
to seastar-dev@googlegroups.com, Vlad Zolotarov
From: Vlad Zolotarov <vl...@cloudius-systems.com>

file: Rework read interface

Move the get() logic in fstream.cc into the file::dma_read_bulk()
fixing some issues:
- Fix the funny "alignment" calculation.
- Make sure the length is aligned too.
- Added new functions:
- dma_read(pos, len): returns a temporary_buffer with read data and
doesn't assume/require any alignment from
either "pos"
or "len". Unlike dma_read_bulk() this function
will
trim the resulting buffer to the requested size.
- dma_read_exactly(pos, len): does exactly what dma_read(pos, len)
does but it
will also throw and exception if it
failed to read
the required number of bytes (e.g. EOF
is reached).
- Changed the names of parameters of dma_read(pos, buf, len) in order to
emphasize
that they have to be aligned.
- Added a description to dma_read(pos, buf, len) to make it even more
clear.

Signed-off-by: Vlad Zolotarov <vl...@cloudius-systems.com>

---
diff --git a/core/file.hh b/core/file.hh
--- a/core/file.hh
+++ b/core/file.hh
@@ -24,7 +24,11 @@

#include "stream.hh"
#include "sstring.hh"
+#include "core/shared_ptr.hh"
+#include "core/align.hh"
+#include "core/future-util.hh"
#include <experimental/optional>
+#include <system_error>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <linux/fs.h>
@@ -110,12 +114,93 @@ class file {
std::unique_ptr<file_impl> _file_impl;
private:
explicit file(int fd) : _file_impl(make_file_impl(fd)) {}
+
public:
file(file&& x) : _file_impl(std::move(x._file_impl)) {}
file& operator=(file&& x) noexcept = default;
+
+ // O_DIRECT reading requires that buffer, offset, and read length, are
+ // all aligned. Alignment of 4096 was necessary in the past, but no
longer
+ // is - 512 is usually enough; But we'll need to use BLKSSZGET ioctl to
+ // be sure it is really enough on this filesystem. 4096 is always safe.
+ // In addition, if we start reading in things outside page boundaries,
+ // we will end up with various pages around, some of them with
+ // overlapping ranges. Those would be very challenging to cache.
+ static constexpr uint64_t dma_alignment = 4096;
+
+ // Make sure alignment is a power of 2
+ static_assert(dma_alignment > 0 && !(dma_alignment & (dma_alignment -
1)),
+ "dma_alignment must be a power of 2");
+
+
+ /**
+ * Perform a single DMA read operation.
+ *
+ * @param aligned_pos offset to begin reading at (should be aligned)
+ * @param aligned_buffer output buffer (should be aligned)
+ * @param aligned_len number of bytes to read (should be aligned)
+ *
+ * Alignment is HW dependent but use 4KB alignment to be on the safe
side as
+ * explained above.
+ *
+ * @return number of bytes actually read
+ * @throw exception in case of I/O error
+ */
+ template <typename CharType>
+ future<size_t>
+ dma_read(uint64_t aligned_pos, CharType* aligned_buffer, size_t
aligned_len) {
+ return _file_impl->read_dma(aligned_pos, aligned_buffer,
aligned_len);
+ }
+
+ /**
+ * Read the requested amount of bytes starting from the given offset.
+ *
+ * @param pos offset to begin reading from
+ * @param len number of bytes to read
+ *
+ * @return temporary buffer containing the requested data.
+ * @throw exception in case of I/O error
+ *
+ * This function doesn't require any alignment for both "pos" and "len"
+ *
+ * @note size of the returned buffer may be smaller than "len" if EOF
is
+ * reached of in case of I/O error.
+ */
+ template <typename CharType>
+ future<temporary_buffer<CharType>> dma_read(uint64_t pos, size_t len) {
+ return dma_read_bulk<CharType>(pos, len).then(
+ [len] (temporary_buffer<CharType> buf) {
+ if (len < buf.size()) {
+ buf.trim(len);
+ }
+
+ return std::move(buf);
+ });
+ }
+
+ class eof_error : public std::exception {};
+
+ /**
+ * Read the exact amount of bytes.
+ *
+ * @param pos offset in a file to begin reading from
+ * @param len number of bytes to read
+ *
+ * @return temporary buffer containing the read data
+ * @throw end_of_file_error if EOF is reached, file_io_error or
+ * std::system_error in case of I/O error.
+ */
template <typename CharType>
- future<size_t> dma_read(uint64_t pos, CharType* buffer, size_t len) {
- return _file_impl->read_dma(pos, buffer, len);
+ future<temporary_buffer<CharType>>
+ dma_read_exactly(uint64_t pos, size_t len) {
+ return dma_read<CharType>(pos, len).then(
+ [pos, len] (auto buf) {
+ if (buf.size() < len) {
+ throw eof_error();
+ }
+
+ return std::move(buf);
+ });
}

future<size_t> dma_read(uint64_t pos, std::vector<iovec> iov) {
@@ -155,6 +240,201 @@ public:
return _file_impl->list_directory(std::move(next));
}

+private:
+ template <typename CharType>
+ struct read_state {
+ typedef temporary_buffer<CharType> tmp_buf_type;
+
+ read_state(uint64_t offset, uint64_t front, size_t to_read)
+ : buf(tmp_buf_type::aligned(dma_alignment,
+ align_up(to_read, dma_alignment)))
+ , _offset(offset)
+ , _to_read(to_read)
+ , _front(front) {}
+
+ bool done() const {
+ return eof || pos >= _to_read;
+ }
+
+ /**
+ * Trim the buffer to the actual number of read bytes and cut the
+ * bytes from offset 0 till "_front".
+ *
+ * @note this function has to be called only if we read bytes
beyond
+ * "_front".
+ */
+ void trim_buf_before_ret() {
+ assert(have_good_bytes());
+
+ buf.trim(pos);
+ buf.trim_front(_front);
+ }
+
+ uint64_t cur_offset() const {
+ return _offset + pos;
+ }
+
+ size_t left_space() const {
+ return buf.size() - pos;
+ }
+
+ size_t left_to_read() const {
+ // positive as long as (done() == false)
+ return _to_read - pos;
+ }
+
+ void append_new_data(tmp_buf_type& new_data) {
+ auto to_copy = std::min(left_space(), new_data.size());
+
+ std::memcpy(buf.get_write() + pos, new_data.get(), to_copy);
+ pos += to_copy;
+ }
+
+ bool have_good_bytes() const {
+ return pos > _front;
+ }
+
+ public:
+ bool eof = false;
+ tmp_buf_type buf;
+ size_t pos = 0;
+ private:
+ uint64_t _offset;
+ size_t _to_read;
+ uint64_t _front;
+ };
+
+public:
+ /**
+ * Read a data bulk containing the provided addresses range that
starts at
+ * the given offset and ends at either the address aligned to
+ * dma_alignment (4KB) or at the file end.
+ *
+ * @param offset starting address of the range the read bulk should
contain
+ * @param range_size size of the addresses range
+ *
+ * @return temporary buffer containing the read data bulk.
+ * @throw system_error exception in case of I/O error or eof_error when
+ * "offset" is beyond EOF.
+ */
+ template <typename CharType>
+ future<temporary_buffer<CharType>>
+ dma_read_bulk(uint64_t offset, size_t range_size) {
+ using tmp_buf_type = typename read_state<CharType>::tmp_buf_type;
+
+ auto front = offset & (dma_alignment - 1);
+ offset -= front;
+ range_size += front;
+
+ auto rstate = make_lw_shared<read_state<CharType>>(offset, front,
+ range_size);
+
+ //
+ // First, try to read directly into the buffer. Most of the reads
will
+ // end here.
+ //
+ auto read = dma_read(offset, rstate->buf.get_write(),
+ rstate->buf.size());
+
+ return read.then([rstate, this] (size_t size) mutable {
+ rstate->pos = size;
+
+ //
+ // If we haven't read all required data at once -
+ // start read-copy sequence. We can't continue with direct
reads
+ // into the previously allocated buffer here since we have to
ensure
+ // the aligned read length and thus the aligned destination
buffer
+ // size.
+ //
+ // The copying will actually take place only when we either
reach
+ // EOF or in case of I/O errors, thus this should not happen a
lot.
+ //
+ return do_until(
+ [rstate] { return rstate->done(); },
+ [rstate, this] () mutable {
+ return read_maybe_eof<CharType>(
+ rstate->cur_offset(),
rstate->left_to_read()).then_wrapped(
+ [rstate] (auto f) mutable {
+ try {
+ auto buf1 = std::get<0>(f.get());
+ rstate->append_new_data(buf1);
+
+ return make_ready_future<>();
+ } catch (eof_error& e) {
+ if (rstate->have_good_bytes()){
+ rstate->eof = true;
+
+ return make_ready_future<>();
+ } else {
+ throw;
+ }
+ }
+ });
+ }).then_wrapped([rstate] (auto f) mutable {
+ f.get();
+ //
+ // If we are here we are promised to have read some bytes
beyond
+ // "front" so we may trim straight away.
+ //
+ rstate->trim_buf_before_ret();
+ return
make_ready_future<tmp_buf_type>(std::move(rstate->buf));
+ });
+ });
+ }
+
+private:
+ /**
+ * Try to read from the given position where the previous short read
has
+ * stopped. Check the EOF condition.
+ *
+ * The below code assumes the following: short reads due to I/O errors
+ * always end at address aligned to HW block boundary. Therefore if we
issue
+ * a new read operation from the next position we are promised to get
an
+ * error (different from EINVAL). If we've got a short read because we
have
+ * reached EOF then the above read would either return a zero-length
success
+ * (if the file size is aligned to HW block size) or an EINVAL error
(if
+ * file length is not aligned to HW block size).
+ *
+ * @param pos offset to read from
+ * @param len number of bytes to read
+ *
+ * @return temporary buffer with read data
+ * @throw appropriate exception in case of I/O error or when "pos" is
beyond
+ * EOF.
+ */
+ template <typename CharType>
+ future<temporary_buffer<CharType>>
+ read_maybe_eof(uint64_t pos, size_t len) {
+ //
+ // We have to allocate a new aligned buffer to make sure we don't
get
+ // an EINVAL error due to unaligned destination buffer.
+ //
+ temporary_buffer<CharType> buf =
temporary_buffer<CharType>::aligned(
+ dma_alignment, align_up(len, dma_alignment));
+
+ // try to read a single bulk from the given position
+ return dma_read(pos, buf.get_write(), buf.size()).then_wrapped(
+ [buf = std::move(buf)](future<size_t> f) mutable {
+ try {
+ size_t size = std::get<0>(f.get());
+
+ if (!size) {
+ throw eof_error();
+ } else {
+ buf.trim(size);
+
+ return std::move(buf);
+ }
+ } catch (std::system_error& e) {
+ if (e.code().value() == EINVAL) {
+ throw eof_error();
+ } else {
+ throw;
+ }
+ }
+ });
+ }
+
friend class reactor;
};

diff --git a/core/fstream.cc b/core/fstream.cc
--- a/core/fstream.cc
+++ b/core/fstream.cc
@@ -27,30 +27,39 @@
class file_data_source_impl : public data_source_impl {
lw_shared_ptr<file> _file;
uint64_t _pos;
+ std::experimental::optional<size_t> _fsize;
size_t _buffer_size;
+private:
+ // Should be called only when _fsize is initialized
+ future<temporary_buffer<char>> do_get() {
+ using buf_type = temporary_buffer<char>;
+
+ if (_pos >= _fsize.value()) {
+ return make_ready_future<buf_type>(std::move(buf_type(0)));
+ }
+
+ return _file->dma_read_bulk<char>(_pos, _buffer_size).then(
+ [this] (buf_type buf) {
+ _pos += buf.size();
+
+ return std::move(buf);
+ });
+ }
public:
file_data_source_impl(lw_shared_ptr<file> f, uint64_t pos, size_t
buffer_size)
: _file(std::move(f)), _pos(pos), _buffer_size(buffer_size) {}
+
virtual future<temporary_buffer<char>> get() override {
- // must align allocation for dma
- auto alignment = std::min<size_t>(_buffer_size, 4096);
- auto buf = temporary_buffer<char>::aligned(alignment,
_buffer_size);
- auto q = buf.get_write(); // alive while "buf" is kept alive
- auto old_pos = _pos;
- // dma_read needs to be aligned. It doesn't have to be
page-aligned,
- // though, and we could get away with something much smaller.
However, if
- // we start reading in things outside page boundaries, we will end
up with
- // various pages around, some of them with overlapping ranges.
Those would
- // be very challenging to cache.
- old_pos &= ~4095;
- auto front = _pos - old_pos;
- _pos += _buffer_size - front;
- return _file->dma_read(old_pos, q, _buffer_size).then(
- [buf = std::move(buf), front] (size_t size) mutable {
- buf.trim(size);
- buf.trim_front(front);
- return
make_ready_future<temporary_buffer<char>>(std::move(buf));
- });
+ if (!_fsize){
+ return _file->size().then(
+ [this] (size_t fsize) {
+ _fsize = fsize;
+
+ return do_get();
+ });
+ }
+
+ return do_get();
}
};

Reply all
Reply to author
Forward
0 new messages