[PATCH v1 02/11] transport: co-routinize cql_server::connection::process_on_shard

0 views
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 9, 2024, 7:32:26 AM (8 days ago) Jul 9
to scylladb-dev@googlegroups.com
---
transport/server.cc | 23 ++++++++++-------------
1 file changed, 10 insertions(+), 13 deletions(-)

diff --git a/transport/server.cc b/transport/server.cc
index 8309fea0c95..e1391f08d19 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -965,20 +965,17 @@ cql_server::connection::process_on_shard(::shared_ptr<messages::result_message::
return _server.container().invoke_on(*bounce_msg->move_to_shard(), _server._config.bounce_request_smp_service_group,
[this, is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit), process_fn,
gt = tracing::global_trace_state_ptr(std::move(trace_state)),
- cached_vals = std::move(bounce_msg->take_cached_pk_function_calls())] (cql_server& server) {
+ cached_vals = std::move(bounce_msg->take_cached_pk_function_calls())] (cql_server& server) -> future<cql_server::result_with_foreign_response_ptr> {
+ auto trace_state = tracing::trace_state_ptr(gt);
service::client_state client_state = cs.get();
- return do_with(bytes_ostream(), std::move(client_state), std::move(cached_vals),
- [this, &server, is = std::move(is), stream, process_fn,
- trace_state = tracing::trace_state_ptr(gt)] (bytes_ostream& linearization_buffer,
- service::client_state& client_state,
- cql3::computed_function_values& cached_vals) mutable {
- request_reader in(is, linearization_buffer);
- return process_fn(client_state, server._query_processor, in, stream, _version,
- /* FIXME */empty_service_permit(), std::move(trace_state), false, std::move(cached_vals)).then([] (auto msg) {
- // result here has to be foreign ptr
- return std::get<cql_server::result_with_foreign_response_ptr>(std::move(msg));
- });
- });
+
+ bytes_ostream linearization_buffer;
+ request_reader in(is, linearization_buffer);
+
+ auto msg = co_await process_fn(client_state, server._query_processor, in, stream, _version,
+ /* FIXME */empty_service_permit(), std::move(trace_state), false, std::move(cached_vals));
+ // result here has to be foreign ptr
+ co_return std::get<cql_server::result_with_foreign_response_ptr>(std::move(msg));
});
}

--
2.45.2

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 10, 2024, 12:02:45 PM (7 days ago) Jul 10
to Gleb Natapov, scylladb-dev@googlegroups.com


On 7/9/24 1:30 PM, 'Gleb Natapov' via ScyllaDB development wrote:
> ---
> transport/server.cc | 23 ++++++++++-------------
> 1 file changed, 10 insertions(+), 13 deletions(-)
>
> diff --git a/transport/server.cc b/transport/server.cc
> index 8309fea0c95..e1391f08d19 100644
> --- a/transport/server.cc
> +++ b/transport/server.cc
> @@ -965,20 +965,17 @@ cql_server::connection::process_on_shard(::shared_ptr<messages::result_message::
> return _server.container().invoke_on(*bounce_msg->move_to_shard(), _server._config.bounce_request_smp_service_group,
> [this, is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit), process_fn,
> gt = tracing::global_trace_state_ptr(std::move(trace_state)),
> - cached_vals = std::move(bounce_msg->take_cached_pk_function_calls())] (cql_server& server) {
> + cached_vals = std::move(bounce_msg->take_cached_pk_function_calls())] (cql_server& server) -> future<cql_server::result_with_foreign_response_ptr> > + auto trace_state = tracing::trace_state_ptr(gt);
> service::client_state client_state = cs.get();
> - return do_with(bytes_ostream(), std::move(client_state), std::move(cached_vals),
> - [this, &server, is = std::move(is), stream, process_fn,
> - trace_state = tracing::trace_state_ptr(gt)] (bytes_ostream& linearization_buffer,
do_with was keeping client_state and cached_vals alive until process_fn
finished (future became ready). Now nothing keeps them alive, they get
destroyed when process_fn returns? (They are captures of the lambda, not
parameters of it)

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 10, 2024, 12:43:15 PM (7 days ago) Jul 10
to Kamil Braun, scylladb-dev@googlegroups.com
On Wed, Jul 10, 2024 at 06:02:41PM +0200, Kamil Braun wrote:
>
>
> On 7/9/24 1:30 PM, 'Gleb Natapov' via ScyllaDB development wrote:
> > ---
> > transport/server.cc | 23 ++++++++++-------------
> > 1 file changed, 10 insertions(+), 13 deletions(-)
> >
> > diff --git a/transport/server.cc b/transport/server.cc
> > index 8309fea0c95..e1391f08d19 100644
> > --- a/transport/server.cc
> > +++ b/transport/server.cc
> > @@ -965,20 +965,17 @@ cql_server::connection::process_on_shard(::shared_ptr<messages::result_message::
> > return _server.container().invoke_on(*bounce_msg->move_to_shard(), _server._config.bounce_request_smp_service_group,
> > [this, is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit), process_fn,
> > gt = tracing::global_trace_state_ptr(std::move(trace_state)),
> > - cached_vals = std::move(bounce_msg->take_cached_pk_function_calls())] (cql_server& server) {
> > + cached_vals = std::move(bounce_msg->take_cached_pk_function_calls())] (cql_server& server) -> future<cql_server::result_with_foreign_response_ptr> > + auto trace_state = tracing::trace_state_ptr(gt);
> > service::client_state client_state = cs.get();
> > - return do_with(bytes_ostream(), std::move(client_state), std::move(cached_vals),
> > - [this, &server, is = std::move(is), stream, process_fn,
> > - trace_state = tracing::trace_state_ptr(gt)] (bytes_ostream& linearization_buffer,
> do_with was keeping client_state and cached_vals alive until process_fn
> finished (future became ready). Now nothing keeps them alive, they get
> destroyed when process_fn returns? (They are captures of the lambda, not
> parameters of it)
>
invoke_on keeps lambda alive until its future resolves. This is what surprised
me in the lwt code.
Reply all
Reply to author
Forward
0 new messages