I've been working on an H2 server for Ruby with native SSL support, and I would like to use nio4r to implement the reactor for it. I feel like I should be able to get better concurrency out of it than I am, so I figured I'd post what I'm doing and see if I could get any thoughts. My main issues are:
1) Am I properly handling asynchronous SSL negotiation? It does seem to work, but I also noticed that doing a blocking accept with SSL was almost twice as fast (25ms vs 40ms in the fastest cases).
2) Is it expected that calls to read_nonblock return nil? Specifically I am settings exceptions to false, so perhaps it won't raise EOF in that case? Right now, when I get a nil, I close the connection, on the assumption that it means the stream is dead. I'm not sure if that is true, but when I don't, nior4 constantly calls my monitor's value method.
3) How can I more fairly distribute accepts/reads/writes, or even prioritize them? It seems like data is immediately readable once the SSL accepts, but I'm not sure if it makes sense to only do one operation per connection per loop (i.e., once I accept, wait until the next loop before trying to read, in order to ensure more opportunity for fairness...or perform only a single read operation per connection per loop, in case one is uploading a file or something).
I've included all of the code below, but obiously there is a lot of extra stuff for creating the SSL connection and handling the HT
TP2 parsing/response. Any thoughts on whether I am doing things "correctly", or how best to handle the issues above, would be great. This is mostly a learning experience for me, as I haven't done much with evented systems before.
require 'openssl'
require 'http/2'
require 'socketry'
require 'ruby-prof'
class H2Server
ALPN_PROTOCOL = 'h2'
ALPN_SELECT_CALLBACK = ->(ps){ ps.find { |p| ALPN_PROTOCOL == p }}
ECDH_CURVES = 'P-256'
TMP_ECDH_CALLBACK = ->(*_){ OpenSSL::PKey::EC.new 'prime256v1' }
ECDH_OPENSSL_MIN_VERSION = '2.0'
ALPN_OPENSSL_MIN_VERSION = 0x10002001
def initialize(host, port, **opts)
@selector = NIO::Selector.new
puts "Listening on #{host}:#{port}"
@server = TCPServer.new(host, port)
@context = create_ssl_context(opts)
monitor = @selector.register(@server, :r)
monitor.value = proc { |m| accept }
end
def run
RubyProf.start
loop do
begin
@selector.select { |monitor| monitor.value.call(monitor) }
rescue => e
puts "#{e.class}: #{e.message} (select)"
puts e.backtrace.join("\n")
end
end
ensure
result = RubyProf.stop
printer = RubyProf::FlatPrinter.new(result)
printer.print(STDOUT)
end
def accept
sock = @server.accept_nonblock exception: false
return sock if sock == :wait_readable
ssl_sock = OpenSSL::SSL::SSLSocket.new(sock, @context)
ssl_sock.sync = true
client = accept_ssl(ssl_sock)
if client == :wait_readable or client == :wait_writeable
#puts "SSL not ready, waiting for ready"
monitor = @selector.register(ssl_sock, :r)
monitor.value = proc { accept_ssl(ssl_sock, monitor) }
end
end
def accept_ssl ssl_sock, monitor=nil
begin
#client = ssl_sock.accept
client = ssl_sock.accept_nonblock exception: false
return client if client == :wait_readable or client == :wait_writeable
#The client will disconnect automatically if ALPN
#re-negotiates H2
_, port, host = client.peeraddr
puts "*** #{host}:#{port} connected"
resp = "TEST DATA"
conn = HTTP2::Server.new
conn.on(:frame) {|bytes| client.write(bytes)}
conn.on(:stream) do |stream|
req = {}
buffer = NIO::ByteBuffer.new(16384)
stream.on(:headers) do |h|
req = Hash[*h.flatten]
end
stream.on(:data) do |d|
buffer << d
end
stream.on(:half_close) do
buffer.flip
response = nil
stream.headers({
':status' => '200',
'content-length' => resp.length.to_s
}, end_stream: false)
stream.data(resp)
end
end
read(client, conn, monitor)
rescue => e
puts "#{e.class}: #{e.message} (accept)"
if client
@selector.deregister(client)
client.close
end
puts "Closed (accept)"
end
end
def read(socket, conn, monitor)
#puts "reading #{socket}, #{monitor.inspect}"
until (data = socket.read_nonblock(4096, exception: false)) == :wait_readable
raise "No data" if data.nil?
conn << data
end
#puts "no data, waiting for more"
puts "Creating monitor" unless monitor
monitor = @selector.register(socket, :r) unless monitor
monitor.value = proc {read(socket, conn, monitor)}
rescue EOFError, Errno::EPIPE, Errno::ECONNRESET => e
puts e.message
begin
_, port, host = socket.peeraddr
puts "*** #{host}:#{port} disconnected (read)"
rescue
puts "Something disconnected, not sure what (read)"
end
@selector.deregister(socket)
socket.close
false
rescue => e
puts "#{e.class}: #{e.message} (read)"
@selector.deregister(socket)
begin
_, port, host = socket.peeraddr
puts "*** #{host}:#{port} disconnected (read)"
rescue
puts "Something disconnected, not sure what (read)"
end
socket.close
false
end
def create_ssl_context **opts
ctx = OpenSSL::SSL::SSLContext.new
ctx.ca_file = opts[:ca_file] if opts[:ca_file]
ctx.ca_path = opts[:ca_path] if opts[:ca_path]
ctx.cert = context_cert opts[:cert]
ctx.ciphers = opts[:ciphers] || OpenSSL::SSL::SSLContext::DEFAULT_PARAMS[:ciphers]
ctx.extra_chain_cert = context_extra_chain_cert opts[:extra_chain_cert]
ctx.key = context_key opts[:key]
ctx.options = opts[:options] || OpenSSL::SSL::SSLContext::DEFAULT_PARAMS[:options]
ctx.ssl_version = :TLSv1_2
context_ecdh ctx
context_set_protocols ctx
ctx
end
if OpenSSL::VERSION >= ECDH_OPENSSL_MIN_VERSION
def context_ecdh ctx
ctx.ecdh_curves = ECDH_CURVES
end
else
def context_ecdh ctx
ctx.tmp_ecdh_callback = TMP_ECDH_CALLBACK
end
end
def context_cert cert
case cert
when String
cert = File.read cert if File.exist? cert
OpenSSL::X509::Certificate.new cert
when OpenSSL::X509::Certificate
cert
end
end
def context_key key
case key
when String
key = File.read key if File.exist? key
OpenSSL::PKey::RSA.new key
when OpenSSL::PKey::RSA
key
end
end
def context_extra_chain_cert chain
case chain
when String
chain = File.read chain if File.exist? chain
[OpenSSL::X509::Certificate.new(chain)]
when OpenSSL::X509::Certificate
[chain]
when Array
chain
end
end
if OpenSSL::OPENSSL_VERSION_NUMBER >= ALPN_OPENSSL_MIN_VERSION
def context_set_protocols ctx
ctx.alpn_protocols = [ALPN_PROTOCOL]
ctx.alpn_select_cb = ALPN_SELECT_CALLBACK
end
else
def context_set_protocols ctx
ctx.npn_protocols = [ALPN_PROTOCOL]
ctx.npn_select_cb = ALPN_SELECT_CALLBACK
end
end
end
H2Server.new("localhost", 1234, key: 'test.key', cert: 'test.crt').run if $PROGRAM_NAME == __FILE__