Gleb Natapov
<gleb@scylladb.com>unread,Jul 9, 2024, 7:32:28 AM (8 days ago) Jul 9Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
to scylladb-dev@googlegroups.com
---
transport/server.cc | 248 ++++++++++++++++++++++----------------------
1 file changed, 123 insertions(+), 125 deletions(-)
diff --git a/transport/server.cc b/transport/server.cc
index a20a16e6ea8..cc3c64c3c8a 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -657,143 +657,141 @@ void cql_server::connection::handle_error(future<>&& f) {
}
future<> cql_server::connection::process_request() {
- return read_frame().then_wrapped([this] (future<std::optional<cql_binary_frame_v3>>&& v) {
- auto maybe_frame = v.get();
- if (!maybe_frame) {
- // eof
- return make_ready_future<>();
+ std::optional<cql_binary_frame_v3> maybe_frame = co_await read_frame();
+
+ if (!maybe_frame) {
+ // eof
+ co_return;
+ }
+
+ auto& f = *maybe_frame;
+ auto op = f.opcode;
+ auto stream = f.stream;
+ auto length = f.length;
+ auto flags = f.flags;
+
+ const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive;
+ if (allow_shedding && _shed_incoming_requests) {
+ ++_server._stats.requests_shed;
+ co_await _read_buf.skip(length);
+ const char* message = "request shed due to coordinator overload";
+ clogger.debug("{}: {}, stream {}", _client_state.get_remote_address(), message, uint16_t(stream));
+ write_response(make_error(stream, exceptions::exception_code::OVERLOADED, message, tracing::trace_state_ptr()));
+ co_return;
+ }
+
+ tracing_request_type tracing_requested = tracing_request_type::not_requested;
+ if (f.flags & cql_frame_flags::tracing) {
+ // If tracing is requested for a specific CQL command - flush
+ // tracing info right after the command is over.
+ tracing_requested = tracing_request_type::write_on_close;
+ } else if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
+ tracing_requested = tracing_request_type::no_write_on_close;
+ }
+
+ auto mem_estimate = length * 2 + 8000; // Allow for extra copies and bookkeeping
+ if (mem_estimate > _server._max_request_size) {
+ const auto message = format("request size too large (frame size {:d}; estimate {:d}; allowed {:d})",
+ uint32_t(length), mem_estimate, _server._max_request_size);
+ clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
+ write_response(make_error(stream, exceptions::exception_code::INVALID, message, tracing::trace_state_ptr()));
+ co_await std::exchange(_ready_to_respond, make_ready_future<>());
+ co_await _read_buf.close();
+ co_return co_await util::skip_entire_stream(_read_buf);
+ }
+
+ if (_server._stats.requests_serving > _server._max_concurrent_requests) {
+ ++_server._stats.requests_shed;
+ co_await _read_buf.skip(length);
+ const auto message = format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}",
+ _server._stats.requests_serving);
+ clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
+ write_response(make_error(stream, exceptions::exception_code::OVERLOADED, message, tracing::trace_state_ptr()));
+ co_return;
+ }
+
+ const auto shedding_timeout = std::chrono::milliseconds(50);
+ auto fut = allow_shedding ? get_units(_server._memory_available, mem_estimate, shedding_timeout) :
+ get_units(_server._memory_available, mem_estimate);
+
+ if (_server._memory_available.waiters()) {
+ if (allow_shedding && !_shedding_timer.armed()) {
+ _shedding_timer.arm(shedding_timeout);
}
+ ++_server._stats.requests_blocked_memory;
+ }
- auto& f = *maybe_frame;
-
- const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive;
- if (allow_shedding && _shed_incoming_requests) {
- ++_server._stats.requests_shed;
- return _read_buf.skip(f.length).then([this, stream = f.stream] {
- const char* message = "request shed due to coordinator overload";
- clogger.debug("{}: {}, stream {}", _client_state.get_remote_address(), message, uint16_t(stream));
- write_response(make_error(stream, exceptions::exception_code::OVERLOADED,
- message, tracing::trace_state_ptr()));
- return make_ready_future<>();
- });
- }
+ bool timeout_err = false;
+ semaphore_units<> units;
+ try {
+ units = co_await std::move(fut);
+ } catch (semaphore_timed_out&) {
+ timeout_err = true;
+ }
- tracing_request_type tracing_requested = tracing_request_type::not_requested;
- if (f.flags & cql_frame_flags::tracing) {
- // If tracing is requested for a specific CQL command - flush
- // tracing info right after the command is over.
- tracing_requested = tracing_request_type::write_on_close;
- } else if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
- tracing_requested = tracing_request_type::no_write_on_close;
+ if (timeout_err) {
+ if (allow_shedding) {
+ // Cancel shedding in case no more requests are going to do that on completion
+ if (_pending_requests_gate.get_count() == 0) {
+ _shed_incoming_requests = false;
+ }
+ co_await _read_buf.skip(length);
+ co_return;
}
+ }
- auto op = f.opcode;
- auto stream = f.stream;
- auto mem_estimate = f.length * 2 + 8000; // Allow for extra copies and bookkeeping
- if (mem_estimate > _server._max_request_size) {
- const auto message = format("request size too large (frame size {:d}; estimate {:d}; allowed {:d})",
- uint32_t(f.length), mem_estimate, _server._max_request_size);
- clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
- write_response(make_error(stream, exceptions::exception_code::INVALID, message, tracing::trace_state_ptr()));
- return std::exchange(_ready_to_respond, make_ready_future<>())
- .then([this] { return _read_buf.close(); })
- .then([this] { return util::skip_entire_stream(_read_buf); });
- }
+ service_permit mem_permit = make_service_permit(std::move(units));
- if (_server._stats.requests_serving > _server._max_concurrent_requests) {
- ++_server._stats.requests_shed;
- return _read_buf.skip(f.length).then([this, stream = f.stream] {
- const auto message = format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}",
- _server._stats.requests_serving);
- clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
- write_response(make_error(stream, exceptions::exception_code::OVERLOADED,
- message,
- tracing::trace_state_ptr()));
- return make_ready_future<>();
- });
- }
+ fragmented_temporary_buffer buf = co_await read_and_decompress_frame(length, flags);
- const auto shedding_timeout = std::chrono::milliseconds(50);
- auto fut = allow_shedding
- ? get_units(_server._memory_available, mem_estimate, shedding_timeout).then_wrapped([this, length = f.length] (auto f) {
- try {
- return make_ready_future<semaphore_units<>>(f.get());
- } catch (semaphore_timed_out& sto) {
- // Cancel shedding in case no more requests are going to do that on completion
- if (_pending_requests_gate.get_count() == 0) {
- _shed_incoming_requests = false;
- }
- return _read_buf.skip(length).then([sto = std::move(sto)] () mutable {
- return make_exception_future<semaphore_units<>>(std::move(sto));
- });
- }
- })
- : get_units(_server._memory_available, mem_estimate);
- if (_server._memory_available.waiters()) {
- if (allow_shedding && !_shedding_timer.armed()) {
- _shedding_timer.arm(shedding_timeout);
- }
- ++_server._stats.requests_blocked_memory;
- }
+ ++_server._stats.requests_served;
+ ++_server._stats.requests_serving;
- return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) {
- if (mem_permit_fut.failed()) {
- // Ignore semaphore errors - they are expected if load shedding took place
- mem_permit_fut.ignore_ready_future();
- return make_ready_future<>();
- }
- semaphore_units<> mem_permit = mem_permit_fut.get();
- return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable {
-
- ++_server._stats.requests_served;
- ++_server._stats.requests_serving;
-
- _pending_requests_gate.enter();
- auto leave = defer([this] {
- _shedding_timer.cancel();
- _shed_incoming_requests = false;
- _pending_requests_gate.leave();
- });
- auto istream = buf.get_istream();
-
-
- // Parallelize only the performance sensitive requests:
- // QUERY, PREPARE, EXECUTE, BATCH
- bool should_paralelize = (op == uint8_t(cql_binary_opcode::QUERY) ||
- op == uint8_t(cql_binary_opcode::PREPARE) ||
- op == uint8_t (cql_binary_opcode::EXECUTE) ||
- op == uint8_t(cql_binary_opcode::BATCH));
-
- future<foreign_ptr<std::unique_ptr<cql_server::response>>> request_process_future = should_paralelize ?
- _process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
- process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
-
- future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
- try {
- if (response_f.failed()) {
- const auto message = format("request processing failed, error [{}]", response_f.get_exception());
- clogger.error("{}: {}", _client_state.get_remote_address(), message);
- write_response(make_error(stream, exceptions::exception_code::SERVER_ERROR,
- message,
- tracing::trace_state_ptr()));
- } else {
- write_response(response_f.get(), std::move(mem_permit), _compression);
- }
- _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
- } catch (...) {
- clogger.error("{}: request processing failed: {}",
- _client_state.get_remote_address(), std::current_exception());
- }
- });
+ _pending_requests_gate.enter();
+ auto leave = defer([this] {
+ _shedding_timer.cancel();
+ _shed_incoming_requests = false;
+ _pending_requests_gate.leave();
+ });
+
+ // Parallelize only the performance sensitive requests:
+ // QUERY, PREPARE, EXECUTE, BATCH
+ bool should_paralelize = (op == uint8_t(cql_binary_opcode::QUERY) ||
+ op == uint8_t(cql_binary_opcode::PREPARE) ||
+ op == uint8_t (cql_binary_opcode::EXECUTE) ||
+ op == uint8_t(cql_binary_opcode::BATCH));
+
+ auto do_request = [this, stream, mem_permit] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> f) mutable -> future<> {
+ std::exception_ptr ex;
+ foreign_ptr<std::unique_ptr<cql_server::response>> res;
+ try {
+ res = co_await std::move(f);
+ } catch (...) {
+ ex = std::current_exception();
+ }
- if (should_paralelize) {
- return make_ready_future<>();
+ try {
+ if (ex) {
+ const auto message = format("request processing failed, error [{}]", ex);
+ clogger.error("{}: {}", _client_state.get_remote_address(), message);
+ write_response(make_error(stream, exceptions::exception_code::SERVER_ERROR, message, tracing::trace_state_ptr()));
} else {
- return request_response_future;
+ write_response(std::move(res), std::move(mem_permit), _compression);
}
- });
+ } catch (...) {
+ clogger.error("{}: request processing failed: {}", _client_state.get_remote_address(), std::current_exception());
+ }
+ co_return;
+ };
+
+ if (should_paralelize) {
+ // Waited through _pending_requests_gate
+ (void)do_with(std::move(do_request), std::move(leave), std::move(buf), [this, op, stream, mem_permit, tracing_requested] (auto& func, auto& leave, auto& buf) mutable {
+ return func(_process_request_stage(this, buf.get_istream(), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
});
- });
+ } else {
+ co_await do_request(process_request_one(buf.get_istream(), op, stream, seastar::ref(_client_state), tracing_requested, std::move(mem_permit)));
+ }
}
// Contiguous buffers for use with compression primitives.
--
2.45.2