Understanding how to properly use nior4

24 views
Skip to first unread message

Ryan Festag

unread,
Jun 16, 2017, 11:46:53 AM6/16/17
to Ruby Socketry
I've been working on an H2 server for Ruby with native SSL support, and I would like to use nio4r to implement the reactor for it. I feel like I should be able to get better concurrency out of it than I am, so I figured I'd post what I'm doing and see if I could get any thoughts. My main issues are:

1) Am I properly handling asynchronous SSL negotiation? It does seem to work, but I also noticed that doing a blocking accept with SSL was almost twice as fast (25ms vs 40ms in the fastest cases).
2) Is it expected that calls to read_nonblock return nil? Specifically I am settings exceptions to false, so perhaps it won't raise EOF in that case? Right now, when I get a nil, I close the connection, on the assumption that it means the stream is dead. I'm not sure if that is true, but when I don't, nior4 constantly calls my monitor's value method.
3) How can I more fairly distribute accepts/reads/writes, or even prioritize them? It seems like data is immediately readable once the SSL accepts, but I'm not sure if it makes sense to only do one operation per connection per loop (i.e., once I accept, wait until the next loop before trying to read, in order to ensure more opportunity for fairness...or perform only a single read operation per connection per loop, in case one is uploading a file or something).

I've included all of the code below, but obiously there is a lot of extra stuff for creating the SSL connection and handling the HTTP2 parsing/response. Any thoughts on whether I am doing things "correctly", or how best to handle the issues above, would be great. This is mostly a learning experience for me, as I haven't done much with evented systems before.

require 'openssl'
require 'http/2'
require 'socketry'
require 'ruby-prof'

class H2Server
  ALPN_PROTOCOL        
= 'h2'
  ALPN_SELECT_CALLBACK
= ->(ps){ ps.find { |p| ALPN_PROTOCOL == p }}
  ECDH_CURVES          
= 'P-256'
  TMP_ECDH_CALLBACK    
= ->(*_){ OpenSSL::PKey::EC.new 'prime256v1' }

  ECDH_OPENSSL_MIN_VERSION
= '2.0'
  ALPN_OPENSSL_MIN_VERSION
= 0x10002001

 
def initialize(host, port, **opts)
   
@selector = NIO::Selector.new

    puts
"Listening on #{host}:#{port}"
   
@server = TCPServer.new(host, port)
   
@context = create_ssl_context(opts)

    monitor
= @selector.register(@server, :r)
    monitor
.value = proc { |m| accept }
 
end

 
def run
   
RubyProf.start
    loop
do
     
begin
       
@selector.select { |monitor| monitor.value.call(monitor) }
     
rescue => e
        puts
"#{e.class}: #{e.message} (select)"
        puts e
.backtrace.join("\n")
     
end
   
end
 
ensure
    result
= RubyProf.stop
    printer
= RubyProf::FlatPrinter.new(result)
    printer
.print(STDOUT)
 
end

 
def accept
    sock
= @server.accept_nonblock exception: false
   
return sock if sock == :wait_readable
   
    ssl_sock
= OpenSSL::SSL::SSLSocket.new(sock, @context)
    ssl_sock
.sync = true
    client
= accept_ssl(ssl_sock)
   
if client == :wait_readable or client == :wait_writeable
     
#puts "SSL not ready, waiting for ready"
      monitor
= @selector.register(ssl_sock, :r)
      monitor
.value = proc { accept_ssl(ssl_sock, monitor) }
   
end
 
end
 
def accept_ssl ssl_sock, monitor=nil
   
begin
     
#client = ssl_sock.accept
      client
= ssl_sock.accept_nonblock exception: false
     
return client if client == :wait_readable or client == :wait_writeable

     
#The client will disconnect automatically if ALPN
     
#re-negotiates H2
      _
, port, host = client.peeraddr
      puts
"*** #{host}:#{port} connected"

      resp
= "TEST DATA"
      conn
= HTTP2::Server.new
      conn
.on(:frame) {|bytes| client.write(bytes)}
      conn
.on(:stream) do |stream|
        req
= {}
        buffer
= NIO::ByteBuffer.new(16384)
        stream
.on(:headers) do |h|
          req
= Hash[*h.flatten]
       
end
        stream
.on(:data) do |d|
          buffer
<< d
       
end
        stream
.on(:half_close) do
          buffer
.flip
          response
= nil
          stream
.headers({
           
':status' => '200',
           
'content-length' => resp.length.to_s
         
}, end_stream: false)
          stream
.data(resp)
       
end
     
end

      read
(client, conn, monitor)
   
rescue => e
      puts
"#{e.class}: #{e.message} (accept)"
     
if client
       
@selector.deregister(client)
        client
.close
     
end
      puts
"Closed (accept)"
   
end
 
end

 
def read(socket, conn, monitor)
   
#puts "reading #{socket}, #{monitor.inspect}"
   
until (data = socket.read_nonblock(4096, exception: false)) == :wait_readable
     
raise "No data" if data.nil?
      conn
<< data
   
end
   
#puts "no data, waiting for more"
    puts
"Creating monitor" unless monitor
    monitor
= @selector.register(socket, :r) unless monitor
    monitor
.value = proc {read(socket, conn, monitor)}
 
rescue EOFError, Errno::EPIPE, Errno::ECONNRESET => e
    puts e
.message
   
begin
      _
, port, host = socket.peeraddr
      puts
"*** #{host}:#{port} disconnected (read)"
   
rescue
      puts
"Something disconnected, not sure what (read)"
   
end
   
@selector.deregister(socket)
    socket
.close
   
false
 
rescue => e
    puts
"#{e.class}: #{e.message} (read)"
   
@selector.deregister(socket)
   
begin
      _
, port, host = socket.peeraddr
      puts
"*** #{host}:#{port} disconnected (read)"
   
rescue
      puts
"Something disconnected, not sure what (read)"
   
end
    socket
.close
   
false
 
end
 
def create_ssl_context **opts
    ctx                  
= OpenSSL::SSL::SSLContext.new
    ctx
.ca_file          = opts[:ca_file] if opts[:ca_file]
    ctx
.ca_path          = opts[:ca_path] if opts[:ca_path]
    ctx
.cert             = context_cert opts[:cert]
    ctx
.ciphers          = opts[:ciphers] || OpenSSL::SSL::SSLContext::DEFAULT_PARAMS[:ciphers]
    ctx
.extra_chain_cert = context_extra_chain_cert opts[:extra_chain_cert]
    ctx
.key              = context_key opts[:key]
    ctx
.options          = opts[:options] || OpenSSL::SSL::SSLContext::DEFAULT_PARAMS[:options]
    ctx
.ssl_version      = :TLSv1_2
    context_ecdh ctx
    context_set_protocols ctx
    ctx
 
end

 
if OpenSSL::VERSION >= ECDH_OPENSSL_MIN_VERSION
   
def context_ecdh ctx
      ctx
.ecdh_curves = ECDH_CURVES
   
end
 
else
   
def context_ecdh ctx
      ctx
.tmp_ecdh_callback = TMP_ECDH_CALLBACK
   
end
 
end

 
def context_cert cert
   
case cert
   
when String
      cert
= File.read cert if File.exist? cert
     
OpenSSL::X509::Certificate.new cert
   
when OpenSSL::X509::Certificate
      cert
   
end
 
end

 
def context_key key
   
case key
   
when String
      key
= File.read key if File.exist? key
     
OpenSSL::PKey::RSA.new key
   
when OpenSSL::PKey::RSA
      key
   
end
 
end
 
def context_extra_chain_cert chain
   
case chain
   
when String
      chain
= File.read chain if File.exist? chain
     
[OpenSSL::X509::Certificate.new(chain)]
   
when OpenSSL::X509::Certificate
     
[chain]
   
when Array
      chain
   
end
 
end

 
if OpenSSL::OPENSSL_VERSION_NUMBER >= ALPN_OPENSSL_MIN_VERSION
   
def context_set_protocols ctx
      ctx
.alpn_protocols = [ALPN_PROTOCOL]
      ctx
.alpn_select_cb = ALPN_SELECT_CALLBACK
   
end
 
else
   
def context_set_protocols ctx
      ctx
.npn_protocols = [ALPN_PROTOCOL]
      ctx
.npn_select_cb = ALPN_SELECT_CALLBACK
   
end
 
end
end

H2Server
.new("localhost", 1234, key: 'test.key', cert: 'test.crt').run if $PROGRAM_NAME == __FILE__


Tony Arcieri

unread,
Jun 16, 2017, 11:50:09 AM6/16/17
to Ryan Festag, Ruby Socketry
On Fri, Jun 16, 2017 at 8:46 AM, Ryan Festag <rfe...@gmail.com> wrote:
I've been working on an H2 server for Ruby with native SSL support, and I would like to use nio4r to implement the reactor for it. I feel like I should be able to get better concurrency out of it than I am, so I figured I'd post what I'm doing and see if I could get any thoughts.

Well, for starters, this is wrong: 

    if client == :wait_readable or client == :wait_writeable
      
#puts "SSL not ready, waiting for ready"
      monitor 
= @selector.register(ssl_sock, :r)

You need to wait for the socket to become writable in the event it asks you to "wait_writeable".

--
Tony Arcieri

Ryan Festag

unread,
Jun 16, 2017, 1:03:52 PM6/16/17
to Ruby Socketry
Good to know. I understood it could tell me it was waiting to write, but I assumed I only eventually care about the readability. When the :wait_writeable monitor's value is eventually called, would that imply the connection might be ready to be accepted? Or would I always end up waiting for readability anyways? The previous code seemed to work, so I'm curious how that works under the hood.

Would something like this be more correct? Or should I only register either a read or write? In conjunction with this code, I am deregistering the socket before I go into a read, letting it register again with just :r. I'm not sure if that is correct either
def accept
    sock
= @server.accept_nonblock exception: false
   
return sock if sock == :wait_readable

    ssl_sock
= OpenSSL::SSL::SSLSocket.new(sock, @context)
    ssl_sock
.sync = true
    client
= accept_ssl(ssl_sock)

   
if client == :wait_readable || client == :wait_writeable
      monitor
= @selector.register(ssl_sock, :rw)

      monitor
.value = proc { accept_ssl(ssl_sock, monitor) }
   
end
 
end
 
def accept_ssl ssl_sock, monitor=nil
   
begin
     
#client = ssl_sock.accept
      client
= ssl_sock.accept_nonblock exception: false

     
return client if  client == :wait_readable || client == :wait_writeable

     
#The client will disconnect automatically if ALPN
     
#re-negotiates H2


     
@selector.deregister(ssl_sock)
      read
(client, conn, nil)

   
rescue => e
      puts
"#{e.class}: #{e.message} (accept)"
     
if client
       
@selector.deregister(client)
        client
.close
     
end
      puts
"Closed (accept)"
   
end
 
end





Lastly, is it correct to replace the value callback for the monitor? That's effectively how I'm switching from waiting to accept the SSL connection to reading from the socket. I ran into issues creating a new monitor, because we always ended up waiting for the SSL handshake to complete, so there was already a monitor already in place waiting to read from the socket.

Tony Arcieri

unread,
Jun 18, 2017, 9:30:59 PM6/18/17
to Ryan Festag, Ruby Socketry
On Fri, Jun 16, 2017 at 10:03 AM, Ryan Festag <rfe...@gmail.com> wrote:
    if client == :wait_readable || client == :wait_writeable
      monitor
= @selector.register(ssl_sock, :rw)
      monitor
.value = proc { accept_ssl(ssl_sock, monitor) }
   
end

This doesn't work either. You need to wait for the exact events that get surfaced for the I/O operations you're trying to perform.

Consider this scenario: you've just made an SSL connection and sent your first part of the handshake (the ClientHello).

Now you're waiting for a response. The read operation from the remote server returns :wait_readable (it hasn't answered yet). So we're hitting this branch.

You register this socket with the selector. It selects immediately even though no data is ready to read!

Why? Because your write buffer is empty, so the socket is writable. But you don't care about writing, you only care about reading, and there's no read to perform.

Left unchecked this will cause a tight loop in your app where it keeps selecting for writing even though you don't care about writing.

You need to select for the *exact* I/O operations you're actually interested in *performing*. Monitoring for spurious events will cause serious problems.

--
Tony Arcieri

Ryan Festag

unread,
Jun 24, 2017, 12:49:07 PM6/24/17
to Ruby Socketry
Sorry for the delayed response, I didn't have a chance to go back and play with this a bit more until today. What you say above makes perfect sense, and I think I was able to get something working. Originally I assumed I would need to deregister the socket in order to reliably switch between waiting for reads and writes, but that actually caused a lot of failed connections (almost 1/3 of all h2load requests). I found out that I can change my interests directly on the monitor, and I'm able to listen for the specific event of interest. I came up with something like this:
  def accept
    sock
= @server.accept_nonblock exception: false
   
return sock if sock == :wait_readable

    ssl_sock
= OpenSSL::SSL::SSLSocket.new(sock, @context)
    ssl_sock
.sync = true

    client
,monitor = accept_ssl(ssl_sock)

   
if monitor
     
@selector.interests = :r if :wait_readable
     
@selector.interests = :w if :wait_writeable
   
else
     
if client == :wait_readable
        monitor
= @selector.register(ssl_sock, :r)

        monitor
.value = proc { accept_ssl(ssl_sock, monitor) }

     
elsif client == :wait_writeable
        monitor
= @selector.register(ssl_sock, :w)

        monitor
.value = proc { accept_ssl(ssl_sock, monitor) }
     
end

   
end
 
end
 
def accept_ssl ssl_sock, monitor=nil
   
begin
     
#client = ssl_sock.accept
      client
= ssl_sock.accept_nonblock exception: false


     
return [client,monitor] if  client == :wait_readable || client == :wait_writeable
     
...
 
end

Does this seem more correct?

Tony Arcieri

unread,
Jun 24, 2017, 12:59:10 PM6/24/17
to Ryan Festag, Ruby Socketry
On Sat, Jun 24, 2017 at 9:49 AM, Ryan Festag <rfe...@gmail.com> wrote:
      @selector.interests = :r if :wait_readable
     
@selector.interests = :w if :wait_writeable

These are both always going to be true, and the second will clobber the first. 

Ryan Festag

unread,
Jun 24, 2017, 1:07:52 PM6/24/17
to Ruby Socketry
Ah, I must have dropped the "client ==" on accident at some point. Should have been this:


@selector.interests = :r if client == :wait_readable
@selector.interests = :w if client == :wait_writeable
Reply all
Reply to author
Forward
0 new messages