[PATCH v1 00/11] co-routinize most of the CQL transport code

0 views
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:28 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com

Also in scylla-dev gleb/coroutinize-transport

CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/10254/

Gleb Natapov (11):
transport: co-routinize cql_server::connection::process
transport: co-routinize cql_server::connection::process_on_shard
transport: co-routinize process_query_internal
transport: co-routinize cql_server::connection::process_prepare
transport: co-routinize process_execute_internal
transport: co-routinize process_batch_internal
transport: co-routinize cql_server::connection::read_frame
transport: co-routinize cql_server::connection::process_request
transport: co-routinize
cql_server::connection::read_and_decompress_frame
transport: co-routinize cql_server::connection::process_auth_response
topology coordinator: exit instead of abort on
crash_coordinator_before_stream error injection

service/topology_coordinator.cc | 2 +-
transport/server.cc | 515 +++++++++++++++-----------------
2 files changed, 246 insertions(+), 271 deletions(-)

--
2.45.2

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:28 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
---
transport/server.cc | 248 ++++++++++++++++++++++----------------------
1 file changed, 123 insertions(+), 125 deletions(-)

diff --git a/transport/server.cc b/transport/server.cc
index a20a16e6ea8..cc3c64c3c8a 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -657,143 +657,141 @@ void cql_server::connection::handle_error(future<>&& f) {
}

future<> cql_server::connection::process_request() {
- return read_frame().then_wrapped([this] (future<std::optional<cql_binary_frame_v3>>&& v) {
- auto maybe_frame = v.get();
- if (!maybe_frame) {
- // eof
- return make_ready_future<>();
+ std::optional<cql_binary_frame_v3> maybe_frame = co_await read_frame();
+
+ if (!maybe_frame) {
+ // eof
+ co_return;
+ }
+
+ auto& f = *maybe_frame;
+ auto op = f.opcode;
+ auto stream = f.stream;
+ auto length = f.length;
+ auto flags = f.flags;
+
+ const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive;
+ if (allow_shedding && _shed_incoming_requests) {
+ ++_server._stats.requests_shed;
+ co_await _read_buf.skip(length);
+ const char* message = "request shed due to coordinator overload";
+ clogger.debug("{}: {}, stream {}", _client_state.get_remote_address(), message, uint16_t(stream));
+ write_response(make_error(stream, exceptions::exception_code::OVERLOADED, message, tracing::trace_state_ptr()));
+ co_return;
+ }
+
+ tracing_request_type tracing_requested = tracing_request_type::not_requested;
+ if (f.flags & cql_frame_flags::tracing) {
+ // If tracing is requested for a specific CQL command - flush
+ // tracing info right after the command is over.
+ tracing_requested = tracing_request_type::write_on_close;
+ } else if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
+ tracing_requested = tracing_request_type::no_write_on_close;
+ }
+
+ auto mem_estimate = length * 2 + 8000; // Allow for extra copies and bookkeeping
+ if (mem_estimate > _server._max_request_size) {
+ const auto message = format("request size too large (frame size {:d}; estimate {:d}; allowed {:d})",
+ uint32_t(length), mem_estimate, _server._max_request_size);
+ clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
+ write_response(make_error(stream, exceptions::exception_code::INVALID, message, tracing::trace_state_ptr()));
+ co_await std::exchange(_ready_to_respond, make_ready_future<>());
+ co_await _read_buf.close();
+ co_return co_await util::skip_entire_stream(_read_buf);
+ }
+
+ if (_server._stats.requests_serving > _server._max_concurrent_requests) {
+ ++_server._stats.requests_shed;
+ co_await _read_buf.skip(length);
+ const auto message = format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}",
+ _server._stats.requests_serving);
+ clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
+ write_response(make_error(stream, exceptions::exception_code::OVERLOADED, message, tracing::trace_state_ptr()));
+ co_return;
+ }
+
+ const auto shedding_timeout = std::chrono::milliseconds(50);
+ auto fut = allow_shedding ? get_units(_server._memory_available, mem_estimate, shedding_timeout) :
+ get_units(_server._memory_available, mem_estimate);
+
+ if (_server._memory_available.waiters()) {
+ if (allow_shedding && !_shedding_timer.armed()) {
+ _shedding_timer.arm(shedding_timeout);
}
+ ++_server._stats.requests_blocked_memory;
+ }

- auto& f = *maybe_frame;
-
- const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive;
- if (allow_shedding && _shed_incoming_requests) {
- ++_server._stats.requests_shed;
- return _read_buf.skip(f.length).then([this, stream = f.stream] {
- const char* message = "request shed due to coordinator overload";
- clogger.debug("{}: {}, stream {}", _client_state.get_remote_address(), message, uint16_t(stream));
- write_response(make_error(stream, exceptions::exception_code::OVERLOADED,
- message, tracing::trace_state_ptr()));
- return make_ready_future<>();
- });
- }
+ bool timeout_err = false;
+ semaphore_units<> units;
+ try {
+ units = co_await std::move(fut);
+ } catch (semaphore_timed_out&) {
+ timeout_err = true;
+ }

- tracing_request_type tracing_requested = tracing_request_type::not_requested;
- if (f.flags & cql_frame_flags::tracing) {
- // If tracing is requested for a specific CQL command - flush
- // tracing info right after the command is over.
- tracing_requested = tracing_request_type::write_on_close;
- } else if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
- tracing_requested = tracing_request_type::no_write_on_close;
+ if (timeout_err) {
+ if (allow_shedding) {
+ // Cancel shedding in case no more requests are going to do that on completion
+ if (_pending_requests_gate.get_count() == 0) {
+ _shed_incoming_requests = false;
+ }
+ co_await _read_buf.skip(length);
+ co_return;
}
+ }

- auto op = f.opcode;
- auto stream = f.stream;
- auto mem_estimate = f.length * 2 + 8000; // Allow for extra copies and bookkeeping
- if (mem_estimate > _server._max_request_size) {
- const auto message = format("request size too large (frame size {:d}; estimate {:d}; allowed {:d})",
- uint32_t(f.length), mem_estimate, _server._max_request_size);
- clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
- write_response(make_error(stream, exceptions::exception_code::INVALID, message, tracing::trace_state_ptr()));
- return std::exchange(_ready_to_respond, make_ready_future<>())
- .then([this] { return _read_buf.close(); })
- .then([this] { return util::skip_entire_stream(_read_buf); });
- }
+ service_permit mem_permit = make_service_permit(std::move(units));

- if (_server._stats.requests_serving > _server._max_concurrent_requests) {
- ++_server._stats.requests_shed;
- return _read_buf.skip(f.length).then([this, stream = f.stream] {
- const auto message = format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}",
- _server._stats.requests_serving);
- clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
- write_response(make_error(stream, exceptions::exception_code::OVERLOADED,
- message,
- tracing::trace_state_ptr()));
- return make_ready_future<>();
- });
- }
+ fragmented_temporary_buffer buf = co_await read_and_decompress_frame(length, flags);

- const auto shedding_timeout = std::chrono::milliseconds(50);
- auto fut = allow_shedding
- ? get_units(_server._memory_available, mem_estimate, shedding_timeout).then_wrapped([this, length = f.length] (auto f) {
- try {
- return make_ready_future<semaphore_units<>>(f.get());
- } catch (semaphore_timed_out& sto) {
- // Cancel shedding in case no more requests are going to do that on completion
- if (_pending_requests_gate.get_count() == 0) {
- _shed_incoming_requests = false;
- }
- return _read_buf.skip(length).then([sto = std::move(sto)] () mutable {
- return make_exception_future<semaphore_units<>>(std::move(sto));
- });
- }
- })
- : get_units(_server._memory_available, mem_estimate);
- if (_server._memory_available.waiters()) {
- if (allow_shedding && !_shedding_timer.armed()) {
- _shedding_timer.arm(shedding_timeout);
- }
- ++_server._stats.requests_blocked_memory;
- }
+ ++_server._stats.requests_served;
+ ++_server._stats.requests_serving;

- return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) {
- if (mem_permit_fut.failed()) {
- // Ignore semaphore errors - they are expected if load shedding took place
- mem_permit_fut.ignore_ready_future();
- return make_ready_future<>();
- }
- semaphore_units<> mem_permit = mem_permit_fut.get();
- return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable {
-
- ++_server._stats.requests_served;
- ++_server._stats.requests_serving;
-
- _pending_requests_gate.enter();
- auto leave = defer([this] {
- _shedding_timer.cancel();
- _shed_incoming_requests = false;
- _pending_requests_gate.leave();
- });
- auto istream = buf.get_istream();
-
-
- // Parallelize only the performance sensitive requests:
- // QUERY, PREPARE, EXECUTE, BATCH
- bool should_paralelize = (op == uint8_t(cql_binary_opcode::QUERY) ||
- op == uint8_t(cql_binary_opcode::PREPARE) ||
- op == uint8_t (cql_binary_opcode::EXECUTE) ||
- op == uint8_t(cql_binary_opcode::BATCH));
-
- future<foreign_ptr<std::unique_ptr<cql_server::response>>> request_process_future = should_paralelize ?
- _process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
- process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
-
- future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
- try {
- if (response_f.failed()) {
- const auto message = format("request processing failed, error [{}]", response_f.get_exception());
- clogger.error("{}: {}", _client_state.get_remote_address(), message);
- write_response(make_error(stream, exceptions::exception_code::SERVER_ERROR,
- message,
- tracing::trace_state_ptr()));
- } else {
- write_response(response_f.get(), std::move(mem_permit), _compression);
- }
- _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
- } catch (...) {
- clogger.error("{}: request processing failed: {}",
- _client_state.get_remote_address(), std::current_exception());
- }
- });
+ _pending_requests_gate.enter();
+ auto leave = defer([this] {
+ _shedding_timer.cancel();
+ _shed_incoming_requests = false;
+ _pending_requests_gate.leave();
+ });
+
+ // Parallelize only the performance sensitive requests:
+ // QUERY, PREPARE, EXECUTE, BATCH
+ bool should_paralelize = (op == uint8_t(cql_binary_opcode::QUERY) ||
+ op == uint8_t(cql_binary_opcode::PREPARE) ||
+ op == uint8_t (cql_binary_opcode::EXECUTE) ||
+ op == uint8_t(cql_binary_opcode::BATCH));
+
+ auto do_request = [this, stream, mem_permit] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> f) mutable -> future<> {
+ std::exception_ptr ex;
+ foreign_ptr<std::unique_ptr<cql_server::response>> res;
+ try {
+ res = co_await std::move(f);
+ } catch (...) {
+ ex = std::current_exception();
+ }

- if (should_paralelize) {
- return make_ready_future<>();
+ try {
+ if (ex) {
+ const auto message = format("request processing failed, error [{}]", ex);
+ clogger.error("{}: {}", _client_state.get_remote_address(), message);
+ write_response(make_error(stream, exceptions::exception_code::SERVER_ERROR, message, tracing::trace_state_ptr()));
} else {
- return request_response_future;
+ write_response(std::move(res), std::move(mem_permit), _compression);
}
- });
+ } catch (...) {
+ clogger.error("{}: request processing failed: {}", _client_state.get_remote_address(), std::current_exception());
+ }
+ co_return;
+ };
+
+ if (should_paralelize) {
+ // Waited through _pending_requests_gate
+ (void)do_with(std::move(do_request), std::move(leave), std::move(buf), [this, op, stream, mem_permit, tracing_requested] (auto& func, auto& leave, auto& buf) mutable {
+ return func(_process_request_stage(this, buf.get_istream(), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
});
- });
+ } else {
+ co_await do_request(process_request_one(buf.get_istream(), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
+ }
}

// Contiguous buffers for use with compression primitives.
--
2.45.2

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:30 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
---
transport/server.cc | 70 ++++++++++++++++++++++-----------------------
1 file changed, 34 insertions(+), 36 deletions(-)

diff --git a/transport/server.cc b/transport/server.cc
index cc3c64c3c8a..84e7a10375a 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -816,51 +816,49 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
if (length < 4) {
throw std::runtime_error(fmt::format("CQL frame truncated: expected to have at least 4 bytes, got {}", length));
}
- return _buffer_reader.read_exactly(_read_buf, length).then([] (fragmented_temporary_buffer buf) {
- auto input_buffer = input_buffer_guard();
- auto output_buffer = output_buffer_guard();
- auto v = fragmented_temporary_buffer::view(buf);
- int32_t uncomp_len = read_simple<int32_t>(v);
- if (uncomp_len < 0) {
- throw std::runtime_error("CQL frame uncompressed length is negative: " + std::to_string(uncomp_len));
+ auto buf = co_await _buffer_reader.read_exactly(_read_buf, length);
+ auto input_buffer = input_buffer_guard();
+ auto output_buffer = output_buffer_guard();
+ auto v = fragmented_temporary_buffer::view(buf);
+ int32_t uncomp_len = read_simple<int32_t>(v);
+ if (uncomp_len < 0) {
+ throw std::runtime_error("CQL frame uncompressed length is negative: " + std::to_string(uncomp_len));
+ }
+ auto in = input_buffer.get_linearized_view(v);
+ co_return output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) {
+ auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in.data()), reinterpret_cast<char*>(out.data()), in.size(), out.size());
+ if (ret < 0) {
+ throw std::runtime_error("CQL frame LZ4 uncompression failure");
}
- auto in = input_buffer.get_linearized_view(v);
- return output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) {
- auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in.data()), reinterpret_cast<char*>(out.data()), in.size(), out.size());
- if (ret < 0) {
- throw std::runtime_error("CQL frame LZ4 uncompression failure");
- }
- if (static_cast<size_t>(ret) != out.size()) { // ret is known to be positive here
- throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
- }
- return static_cast<size_t>(ret);
- });
+ if (static_cast<size_t>(ret) != out.size()) { // ret is known to be positive here
+ throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
+ }
+ return static_cast<size_t>(ret);
});
} else if (_compression == cql_compression::snappy) {
- return _buffer_reader.read_exactly(_read_buf, length).then([] (fragmented_temporary_buffer buf) {
- auto input_buffer = input_buffer_guard();
- auto output_buffer = output_buffer_guard();
- auto in = input_buffer.get_linearized_view(fragmented_temporary_buffer::view(buf));
- size_t uncomp_len;
- if (snappy_uncompressed_length(reinterpret_cast<const char*>(in.data()), in.size(), &uncomp_len) != SNAPPY_OK) {
- throw std::runtime_error("CQL frame Snappy uncompressed size is unknown");
+ auto buf = co_await _buffer_reader.read_exactly(_read_buf, length);
+ auto input_buffer = input_buffer_guard();
+ auto output_buffer = output_buffer_guard();
+ auto in = input_buffer.get_linearized_view(fragmented_temporary_buffer::view(buf));
+ size_t uncomp_len;
+ if (snappy_uncompressed_length(reinterpret_cast<const char*>(in.data()), in.size(), &uncomp_len) != SNAPPY_OK) {
+ throw std::runtime_error("CQL frame Snappy uncompressed size is unknown");
+ }
+ co_return output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) {
+ size_t output_len = out.size();
+ if (snappy_uncompress(reinterpret_cast<const char*>(in.data()), in.size(), reinterpret_cast<char*>(out.data()), &output_len) != SNAPPY_OK) {
+ throw std::runtime_error("CQL frame Snappy uncompression failure");
}
- return output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) {
- size_t output_len = out.size();
- if (snappy_uncompress(reinterpret_cast<const char*>(in.data()), in.size(), reinterpret_cast<char*>(out.data()), &output_len) != SNAPPY_OK) {
- throw std::runtime_error("CQL frame Snappy uncompression failure");
- }
- if (output_len != out.size()) {
- throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
- }
- return output_len;
- });
+ if (output_len != out.size()) {
+ throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
+ }
+ return output_len;
});
} else {
throw exceptions::protocol_exception(format("Unknown compression algorithm"));
}
}
- return _buffer_reader.read_exactly(_read_buf, length);
+ co_return co_await _buffer_reader.read_exactly(_read_buf, length);
}

future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state,
--
2.45.2

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:31 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
---
transport/server.cc | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/transport/server.cc b/transport/server.cc
index c5ff73eafb8..225267d6c37 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -1239,17 +1239,15 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
}

auto batch = ::make_shared<cql3::statements::batch_statement>(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), qp.local().get_cql_stats());
- return qp.local().execute_batch_without_checking_exception_message(batch, query_state, options, std::move(pending_authorization_entries))
- .then([stream, batch, q_state = std::move(q_state), trace_state = query_state.get_trace_state(), version] (auto msg) {
- if (msg->move_to_shard()) {
- return process_fn_return_type(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg));
- } else if (msg->is_exception()) {
- return process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
- } else {
- tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
- return process_fn_return_type(make_foreign(make_result(stream, *msg, trace_state, version)));
- }
- });
+ auto msg = co_await qp.local().execute_batch_without_checking_exception_message(batch, query_state, options, std::move(pending_authorization_entries));
+ if (msg->move_to_shard()) {
+ co_return process_fn_return_type(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg));
+ } else if (msg->is_exception()) {
+ co_return process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
+ } else {
+ tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
+ co_return process_fn_return_type(make_foreign(make_result(stream, *msg, trace_state, version)));
+ }
}

future<cql_server::result_with_foreign_response_ptr>
--
2.45.2

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:31 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
---
transport/server.cc | 17 ++++++-----------
1 file changed, 6 insertions(+), 11 deletions(-)

diff --git a/transport/server.cc b/transport/server.cc
index 84e7a10375a..0bf86c9fe1f 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -927,18 +927,13 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_au
auto buf = in.read_raw_bytes_view(in.bytes_left());
auto challenge = sasl_challenge->evaluate_response(buf);
if (sasl_challenge->is_complete()) {
- return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable {
- client_state.set_login(std::move(user));
- auto f = client_state.check_user_can_login();
- f = f.then([&client_state] {
- return client_state.maybe_update_per_service_level_params();
- });
- return f.then([this, stream, challenge = std::move(challenge), trace_state]() mutable {
- return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
- });
- });
+ auto user = co_await sasl_challenge->get_authenticated_user();
+ client_state.set_login(std::move(user));
+ co_await client_state.check_user_can_login();
+ co_await client_state.maybe_update_per_service_level_params();
+ co_return make_auth_success(stream, std::move(challenge), trace_state);
}
- return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), trace_state));
+ co_return make_auth_challenge(stream, std::move(challenge), trace_state);
}

future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state,
--
2.45.2

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:32 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
abort() leaves behind unneeded cores. Other error injections that abort
a process use exit() as well.
---
service/topology_coordinator.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc
index 1f6c5be5d1e..73e8573a45f 100644
--- a/service/topology_coordinator.cc
+++ b/service/topology_coordinator.cc
@@ -1798,7 +1798,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await _group0.make_nonvoter(replaced_node_id, _as);
}
}
- utils::get_local_injector().inject("crash_coordinator_before_stream", [] { abort(); });
+ utils::get_local_injector().inject("crash_coordinator_before_stream", [] { _exit(1); });
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
auto state = node.rs->state;
try {
--
2.45.2

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:32 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
---
transport/server.cc | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/transport/server.cc b/transport/server.cc
index 39bb3bc954e..c5ff73eafb8 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -1120,17 +1120,15 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
}

tracing::trace(trace_state, "Processing a statement");
- return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
- .then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version] (auto msg) {
- if (msg->move_to_shard()) {
- return process_fn_return_type(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg));
- } else if (msg->is_exception()) {
- return process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
- } else {
- tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
- return process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, skip_metadata)));
- }
- });
+ auto msg = co_await qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization);
+ if (msg->move_to_shard()) {
+ co_return process_fn_return_type(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg));
+ } else if (msg->is_exception()) {
+ co_return process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
+ } else {
+ tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
+ co_return process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, skip_metadata)));
+ }
}

future<cql_server::result_with_foreign_response_ptr> cql_server::connection::process_execute(uint16_t stream, request_reader in,
--
2.45.2

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:33 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
---
transport/server.cc | 60 +++++++++++++++++++++------------------------
1 file changed, 28 insertions(+), 32 deletions(-)

diff --git a/transport/server.cc b/transport/server.cc
index 225267d6c37..a20a16e6ea8 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -367,41 +367,37 @@ cql_server::connection::read_frame() {
if (!_version) {
// We don't know the frame size before reading the first frame,
// so read just one byte, and then read the rest of the frame.
- return _read_buf.read_exactly(1).then([this] (temporary_buffer<char> buf) {
- if (buf.empty()) {
- return make_ready_future<ret_type>();
- }
- _version = buf[0];
- if (_version < 3 || _version > current_version) {
- auto client_version = _version;
- _version = current_version;
- throw exceptions::protocol_exception(format("Invalid or unsupported protocol version: {:d}", client_version));
- }
-
+ auto buf = co_await _read_buf.read_exactly(1);
+ if (buf.empty()) {
+ co_return std::nullopt;
+ }
+ _version = buf[0];
+ if (_version < 3 || _version > current_version) {
+ auto client_version = _version;
+ _version = current_version;
+ throw exceptions::protocol_exception(format("Invalid or unsupported protocol version: {:d}", client_version));
+ }

- return _read_buf.read_exactly(frame_size() - 1).then([this] (temporary_buffer<char> tail) {
- temporary_buffer<char> full(frame_size());
- full.get_write()[0] = _version;
- std::copy(tail.get(), tail.get() + tail.size(), full.get_write() + 1);
- auto frame = parse_frame(std::move(full));
- // This is the very first frame, so reject obviously incorrect frames, to
- // avoid allocating large amounts of memory for the message body
- if (frame.length > 100'000) {
- // The STARTUP message body is a [string map] containing just a few options,
- // so it should be smaller that 100kB. See #4366.
- throw exceptions::protocol_exception(format("Initial message size too large ({:d}), rejecting as invalid", uint32_t(frame.length)));
- }
- return make_ready_future<ret_type>(frame);
- });
- });
+ auto tail = co_await _read_buf.read_exactly(frame_size() - 1);
+ temporary_buffer<char> full(frame_size());
+ full.get_write()[0] = _version;
+ std::copy(tail.get(), tail.get() + tail.size(), full.get_write() + 1);
+ auto frame = parse_frame(std::move(full));
+ // This is the very first frame, so reject obviously incorrect frames, to
+ // avoid allocating large amounts of memory for the message body
+ if (frame.length > 100'000) {
+ // The STARTUP message body is a [string map] containing just a few options,
+ // so it should be smaller that 100kB. See #4366.
+ throw exceptions::protocol_exception(format("Initial message size too large ({:d}), rejecting as invalid", uint32_t(frame.length)));
+ }
+ co_return ret_type(frame);
} else {
// Not the first frame, so we know the size.
- return _read_buf.read_exactly(frame_size()).then([this] (temporary_buffer<char> buf) {
- if (buf.empty()) {
- return make_ready_future<ret_type>();
- }
- return make_ready_future<ret_type>(parse_frame(std::move(buf)));
- });
+ auto buf = co_await _read_buf.read_exactly(frame_size());
+ if (buf.empty()) {
+ co_return std::nullopt;
+ }
+ co_return ret_type(parse_frame(std::move(buf)));
}
}

--
2.45.2

Avi Kivity

<avi@scylladb.com>
unread,
Jul 9, 2024, 8:22:49 AM (8 days ago) Jul 9
to Gleb Natapov, scylladb-dev@googlegroups.com
This needs a benchmark.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 8:25:20 AM (8 days ago) Jul 9
to Avi Kivity, scylladb-dev@googlegroups.com
On Tue, Jul 09, 2024 at 03:22:43PM +0300, Avi Kivity wrote:
> This needs a benchmark.
>
What kind of benchmark this needs?
--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Jul 9, 2024, 8:32:05 AM (8 days ago) Jul 9
to Gleb Natapov, scylladb-dev@googlegroups.com
This is likely a regression. If a packet contains multiple frames, then the before the patch, the first read_exactly() will allocate a continuation, but the rest of the frames will be returned immediately. After the patch each CQL frame will allocate a coroutine frame.


 
--
2.45.2


Avi Kivity

<avi@scylladb.com>
unread,
Jul 9, 2024, 8:42:33 AM (8 days ago) Jul 9
to Gleb Natapov, scylladb-dev@googlegroups.com
On Tue, 2024-07-09 at 14:31 +0300, 'Gleb Natapov' via ScyllaDB development wrote:
I don't like this extra coroutine, but using continuations here won't help.


+
+    if (should_paralelize) {
+        // Waited through _pending_requests_gate
+        (void)do_with(std::move(do_request), std::move(leave), std::move(buf), [this, op, stream, mem_permit, tracing_requested] (auto& func, auto& leave, auto& buf) mutable {
+            return func(_process_request_stage(this, buf.get_istream(), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
         });


Can we avoid the do_with? For example, make all its contents parameters to func so func will be responsible for keeping them alive.

Avi Kivity

<avi@scylladb.com>
unread,
Jul 9, 2024, 8:43:45 AM (8 days ago) Jul 9
to Gleb Natapov, scylladb-dev@googlegroups.com
On Tue, 2024-07-09 at 14:31 +0300, 'Gleb Natapov' via ScyllaDB development wrote:
I'm guessing that this is also a regression wrt adding allocations when several frames are in a packet.

Raphael S. Carvalho

<raphaelsc@scylladb.com>
unread,
Jul 9, 2024, 8:43:52 AM (8 days ago) Jul 9
to Avi Kivity, Gleb Natapov, scylladb-dev@googlegroups.com
Avi, this sounds like good material for dev documentation. It's not
clear to everyone. If there's not an alternative that can match the
allocation count with continuation, then I guess people need to know
when to use continuation instead of coroutine.

>
>
>
> --
> 2.45.2
>
>
> --
> You received this message because you are subscribed to the Google Groups "ScyllaDB development" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to scylladb-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/scylladb-dev/6257ecc27bf0e948ad730353a002651f08a1d71a.camel%40scylladb.com.

Avi Kivity

<avi@scylladb.com>
unread,
Jul 9, 2024, 8:46:01 AM (8 days ago) Jul 9
to Gleb Natapov, scylladb-dev@googlegroups.com
On Tue, 2024-07-09 at 15:25 +0300, Gleb Natapov wrote:
On Tue, Jul 09, 2024 at 03:22:43PM +0300, Avi Kivity wrote:
This needs a benchmark.

What kind of benchmark this needs?



Run cassandra-stress against a single-shard single-partition cache-only server/workload. Run 100M reads (with some concurrency). Divide allocations by requests served and report.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 8:47:58 AM (8 days ago) Jul 9
to Avi Kivity, scylladb-dev@googlegroups.com
Makes sense. Where do I get allocation counter?

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Jul 9, 2024, 9:25:38 AM (8 days ago) Jul 9
to Gleb Natapov, scylladb-dev@googlegroups.com
curl -q http://ip.address:9180/metrics | grep allocation

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 10, 2024, 12:29:55 PM (7 days ago) Jul 10
to Raphael S. Carvalho, Avi Kivity, Gleb Natapov, scylladb-dev@googlegroups.com
Please send a PR, or (the lazy option) open an issue (referencing this
patchset in the issue).

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 10, 2024, 12:31:56 PM (7 days ago) Jul 10
to Avi Kivity, Gleb Natapov, scylladb-dev@googlegroups.com
Suggestion to have the cake and eat it too: move the coroutinized
version of the first branch to separate function (e.g.
`read_first_frame()`), leave this function in continuation style

>
>
>>
>> --
>> 2.45.2
>>
>
> --
> You received this message because you are subscribed to the Google
> Groups "ScyllaDB development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to scylladb-dev...@googlegroups.com
> <mailto:scylladb-dev...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/scylladb-dev/6257ecc27bf0e948ad730353a002651f08a1d71a.camel%40scylladb.com <https://groups.google.com/d/msgid/scylladb-dev/6257ecc27bf0e948ad730353a002651f08a1d71a.camel%40scylladb.com?utm_medium=email&utm_source=footer>.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 10, 2024, 12:33:08 PM (7 days ago) Jul 10
to Avi Kivity, Gleb Natapov, scylladb-dev@googlegroups.com
and leave a comment why this `read_frame()` should not be coroutinized

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 10, 2024, 12:38:58 PM (7 days ago) Jul 10
to Gleb Natapov, scylladb-dev@googlegroups.com
This patch would be 100 times nicer to review inside GitHub with the
"hide whitespace changes" option

(alternatively, you could split the whitespace movements into separate
commit)

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 11, 2024, 8:33:12 AM (6 days ago) Jul 11
to Gleb Natapov, scylladb-dev@googlegroups.com


On 7/9/24 1:31 PM, 'Gleb Natapov' via ScyllaDB development wrote:
What about other exception types (e.g. broken semaphore)? Previous
version of code would ignore them all (`mem_permit_fut.failed()`
branch). Here you let other exceptions fly.


> + }
>
> - tracing_request_type tracing_requested = tracing_request_type::not_requested;
> - if (f.flags & cql_frame_flags::tracing) {
> - // If tracing is requested for a specific CQL command - flush
> - // tracing info right after the command is over.
> - tracing_requested = tracing_request_type::write_on_close;
> - } else if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
> - tracing_requested = tracing_request_type::no_write_on_close;
> + if (timeout_err) {
> + if (allow_shedding) {
I think this check is redundant, you shouldn't get `semaphore_timed_out`
otherwise

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 11, 2024, 8:37:22 AM (6 days ago) Jul 11
to Kamil Braun, scylladb-dev@googlegroups.com
I think it was actually a bug in the original code. Only timeout is
expected from the semaphore. But I can do the same as well.
--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 11, 2024, 8:59:55 AM (6 days ago) Jul 11
to Gleb Natapov, scylladb-dev@googlegroups.com


On 7/9/24 1:31 PM, 'Gleb Natapov' via ScyllaDB development wrote:
TBH the non-coroutinized version of this last part was more readable to
me, and here it's very hard to say if the coroutinized version is doing
the same thing.

We're not even avoiding an allocation with the coro version (because of
`do_with` and because `do_request` has its own coroutine frame)

Can we leave definitions of `request_process_future` and
`request_response_future` unchanged? Unless you find some way to make it
clearly better (e.g. avoid allocation)

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 11, 2024, 9:00:56 AM (6 days ago) Jul 11
to Gleb Natapov, scylladb-dev@googlegroups.com
I suspect that the semaphore could be broken e.g. during shutdown, in
which case we would not return an exceptional future

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 11, 2024, 9:03:48 AM (6 days ago) Jul 11
to Avi Kivity, Gleb Natapov, scylladb-dev@googlegroups.com
And here coroutinization doesn't seem to improve readability, because
there are no continuation chains. There's a single `.then` in each branch.

Perhaps we should leave it non-coroutinized.
>
>
>>  future<std::unique_ptr<cql_server::response>>
>> cql_server::connection::process_startup(uint16_t stream,
>> request_reader in, service::client_state& client_state,
>> --
>> 2.45.2
>>
>
> --
> You received this message because you are subscribed to the Google
> Groups "ScyllaDB development" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to scylladb-dev...@googlegroups.com
> <mailto:scylladb-dev...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/scylladb-dev/43ba2c3243059562eb8f1e4d1d869db3ce657f48.camel%40scylladb.com <https://groups.google.com/d/msgid/scylladb-dev/43ba2c3243059562eb8f1e4d1d869db3ce657f48.camel%40scylladb.com?utm_medium=email&utm_source=footer>.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 15, 2024, 7:00:23 AM (2 days ago) Jul 15
to Kamil Braun, scylladb-dev@googlegroups.com
Well, do_with can be dropped as Avi noticed. Whatever is holds can be
held in the co-routine frame as well. I have a version that does it. As
of readability it is readable only if you used to the future
programming. New version looks like this:

auto istream = buf.get_istream();
if (should_paralelize) {
// Waited through _pending_requests_gate
(void)complete_request(stream, mem_permit, std::move(leave), std::move(buf), _process_request_stage(this, std::move(istream), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
} else {
co_await complete_request(stream, mem_permit, std::move(leave), std::move(buf), process_request_one(std::move(istream), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
}

And complete_request is the same as do_request but a member function
now.

But unfortunately any attempt to co-roitinize the transport code it a
pessimization, so it will add some allocation in case all futures are
ready. The question is if it is a good reason not to co-routinize. If
the code would have been written today it would be almost certainly
written with co-routines. Any code added today that looks like:

semaphore sem;
...
co_await sm.wait()

is also a pessimization, so probably should be written using futures if
on the fast path. But this is transitive: the caller should be to
otherwise it will allocate needlessly instead,

> Can we leave definitions of `request_process_future` and
> `request_response_future` unchanged? Unless you find some way to make it
> clearly better (e.g. avoid allocation)
>
> > +
> > + if (should_paralelize) {
> > + // Waited through _pending_requests_gate
> > + (void)do_with(std::move(do_request), std::move(leave), std::move(buf), [this, op, stream, mem_permit, tracing_requested] (auto& func, auto& leave, auto& buf) mutable {
> > + return func(_process_request_stage(this, buf.get_istream(), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
> > });
> > - });
> > + } else {
> > + co_await do_request(process_request_one(buf.get_istream(), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
> > + }
> > }
> > // Contiguous buffers for use with compression primitives.

--
Gleb.
Reply all
Reply to author
Forward
0 new messages