Adds seastar::net::dns_resolver with methods to query names/addresses.
Wraps nasty state machine to emulate bsd-nonblocking IO on seastar
and uses our patched virtual IO provider struct for c-ares.
Like seastar, only handles ipv4 atm.
---
configure.py | 8 +-
net/dns.hh | 131 ++++++++
net/dns.cc | 899 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
tests/dns_test.cc | 105 +++++++
4 files changed, 1142 insertions(+), 1 deletion(-)
create mode 100644 net/dns.hh
create mode 100644 net/dns.cc
create mode 100644 tests/dns_test.cc
diff --git a/configure.py b/configure.py
index c091318..6742554 100755
--- a/configure.py
+++ b/configure.py
@@ -209,6 +209,7 @@ tests = [
'tests/scollectd_test',
'tests/perf/perf_fstream',
'tests/json_formatter_test',
+ 'tests/dns_test',
]
apps = [
@@ -269,6 +270,7 @@ libnet = [
'net/dhcp.cc',
'net/tls.cc',
'net/inet_address.cc',
+ 'net/dns.cc',
]
core = [
@@ -428,6 +430,7 @@ deps = {
'tests/scollectd_test': ['tests/scollectd_test.cc'] + core,
'tests/perf/perf_fstream': ['tests/perf/perf_fstream.cc'] + core,
'tests/json_formatter_test': ['tests/json_formatter_test.cc'] + core + http,
+ 'tests/dns_test': ['tests/dns_test.cc'] + core + libnet,
}
boost_tests = [
@@ -447,6 +450,7 @@ boost_tests = [
'tests/connect_test',
'tests/scollectd_test',
'tests/json_formatter_test',
+ 'tests/dns_test',
]
for bt in boost_tests:
@@ -673,6 +677,7 @@ with open(buildfile, 'w') as f:
build {dpdk_deps} : dpdkmake {dpdk_sources}
''').format(**globals()))
for mode in build_modes:
+ objdeps = {}
modeval = modes[mode]
if modeval['sanitize'] and not do_sanitize:
print('Note: --static disables debug mode sanitizers')
@@ -711,6 +716,7 @@ with open(buildfile, 'w') as f:
build $builddir/{mode}/{cares_src_lib} : caresmake_{mode} $builddir/{mode}/{cares_dir}/Makefile | {cares_sources}
build $builddir/{mode}/lib{cares_lib}.a : copy_file $builddir/{mode}/{cares_src_lib}
''').format(srcdir = os.getcwd(), cares_opts=modeval['cares_opts'], **globals()))
+ objdeps['$builddir/' + mode + '/net/dns.o'] = ' $builddir/' + mode + '/' + cares_dir + '/ares_build.h'
compiles = {}
ragels = {}
swaggers = {}
@@ -773,7 +779,7 @@ with open(buildfile, 'w') as f:
for obj in compiles:
src = compiles[obj]
gen_headers = list(ragels.keys()) + list(swaggers.keys()) + list(protobufs.keys())
- f.write('build {}: cxx.{} {} || {} \n'.format(obj, mode, src, ' '.join(gen_headers) + dpdk_deps))
+ f.write('build {}: cxx.{} {} || {} \n'.format(obj, mode, src, ' '.join(gen_headers) + dpdk_deps + objdeps.get(obj, '')))
for hh in ragels:
src = ragels[hh]
f.write('build {}: ragel {}\n'.format(hh, src))
diff --git a/net/dns.hh b/net/dns.hh
new file mode 100644
index 0000000..74b9020
--- /dev/null
+++ b/net/dns.hh
@@ -0,0 +1,131 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2016 Cloudius Systems
+ */
+
+#pragma once
+
+#include <vector>
+#include <unordered_map>
+#include <memory>
+#include <experimental/optional>
+
+#include "../core/future.hh"
+#include "../core/sstring.hh"
+#include "../core/shared_ptr.hh"
+#include "inet_address.hh"
+
+struct ipv4_addr;
+
+class socket_address;
+class network_stack;
+
+/**
+ * C-ares based dns query support.
+ * Handles name- and ip-based resolution.
+ *
+ */
+
+namespace seastar {
+namespace net {
+
+/**
+ * A c++-esque version of a hostent
+ */
+struct hostent {
+ // Primary name is always first
+ std::vector<sstring> names;
+ // Primary address is also always first.
+ std::vector<inet_address> addr_list;
+};
+
+typedef std::experimental::optional<inet_address::family> opt_family;
+
+/**
+ * A DNS resolver object.
+ * Wraps the query logic & networking.
+ * Can be instantiated with options and your network
+ * stack of choice, though for "normal" non-test
+ * querying, you are probably better of with the
+ * global calls further down.
+ */
+class dns_resolver {
+public:
+ struct options {
+ std::experimental::optional<bool>
+ use_tcp_query;
+ std::experimental::optional<std::vector<inet_address>>
+ servers;
+ std::experimental::optional<std::chrono::milliseconds>
+ timeout;
+ std::experimental::optional<uint16_t>
+ tcp_port, udp_port;
+ std::experimental::optional<std::vector<sstring>>
+ domains;
+ };
+
+ dns_resolver();
+ dns_resolver(dns_resolver&&);
+ dns_resolver(const options&);
+ dns_resolver(::network_stack&, const options& = {});
+ ~dns_resolver();
+
+ dns_resolver& operator=(dns_resolver&&);
+
+ /**
+ * Resolves a hostname to one or more addresses and aliases
+ */
+ future<hostent> get_host_by_name(const sstring&, opt_family = {});
+ /**
+ * Resolves an address to one or more addresses and aliases
+ */
+ future<hostent> get_host_by_addr(const inet_address&);
+
+ /**
+ * Resolves a hostname to one (primary) address
+ */
+ future<inet_address> resolve_name(const sstring&, opt_family = {});
+ /**
+ * Resolves an address to one (primary) name
+ */
+ future<sstring> resolve_addr(const inet_address&);
+
+ /**
+ * Shuts the object down. Great for tests.
+ */
+ future<> close();
+private:
+ class impl;
+ ::shared_ptr<impl> _impl;
+};
+
+namespace dns {
+
+// See above. These functions simply queries using a shard-local
+// default-stack, default-opts resolver
+future<hostent> get_host_by_name(const sstring&, opt_family = {});
+future<hostent> get_host_by_addr(const inet_address&);
+
+future<inet_address> resolve_name(const sstring&, opt_family = {});
+future<sstring> resolve_addr(const inet_address&);
+
+}
+
+}
+}
diff --git a/net/dns.cc b/net/dns.cc
new file mode 100644
index 0000000..5c7bd10
--- /dev/null
+++ b/net/dns.cc
@@ -0,0 +1,899 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2016 Cloudius Systems
+ */
+
+#include <chrono>
+#include <experimental/string_view>
+
+#include <c-ares/ares.h>
+
+#include "ip.hh"
+#include "api.hh"
+#include "dns.hh"
+#include "core/sstring.hh"
+#include "core/timer.hh"
+#include "core/reactor.hh"
+#include "core/gate.hh"
+#include "util/log.hh"
+
+static seastar::logger dns_log("dns_resolver");
+
+class ares_error_category : public std::error_category {
+public:
+ constexpr ares_error_category() noexcept : std::error_category{} {}
+ const char * name() const noexcept {
+ return "C-Ares";
+ }
+ std::string message(int error) const {
+ switch (error) {
+ /* Server error codes (ARES_ENODATA indicates no relevant answer) */
+ case ARES_ENODATA: return "No data";
+ case ARES_EFORMERR: return "Form error";
+ case ARES_ESERVFAIL: return "Server failure";
+ case ARES_ENOTFOUND: return "Not found";
+ case ARES_ENOTIMP: return "Not implemented";
+ case ARES_EREFUSED: return "Refused";
+
+ /* Locally generated error codes */
+ case ARES_EBADQUERY: return "Bad query";
+ case ARES_EBADNAME: return "Bad name";
+ case ARES_EBADFAMILY: return "Bad family";
+ case ARES_EBADRESP: return "Bad response";
+ case ARES_ECONNREFUSED :return "Connection refused";
+ case ARES_ETIMEOUT: return "Timeout";
+ case ARES_EOF: return "EOF";
+ case ARES_EFILE: return "File error";
+ case ARES_ENOMEM: return "No memory";
+ case ARES_EDESTRUCTION: return "Destruction";
+ case ARES_EBADSTR: return "Bad string";
+
+ /* ares_getnameinfo error codes */
+ case ARES_EBADFLAGS: return "Invalid flags";
+
+ /* ares_getaddrinfo error codes */
+ case ARES_ENONAME: return "No name";
+ case ARES_EBADHINTS: return "Bad hints";
+
+ /* Uninitialized library error code */
+ case ARES_ENOTINITIALIZED: return "Not initialized";
+
+ /* ares_library_init error codes */
+ case ARES_ELOADIPHLPAPI: return "Load PHLPAPI";
+ case ARES_EADDRGETNETWORKPARAMS: return "Get network parameters";
+
+ /* More error codes */
+ case ARES_ECANCELLED: return "Cancelled";
+ default:
+ return "Unknown error";
+ }
+ }
+};
+
+static const ares_error_category ares_errorc;
+
+static void check_ares_error(int error) {
+ if (error != ARES_SUCCESS) {
+ throw std::system_error(error, ares_errorc);
+ }
+}
+
+struct ares_initializer {
+ ares_initializer() {
+ check_ares_error(ares_library_init(0));
+ }
+ ~ares_initializer() {
+ ares_library_cleanup();
+ }
+};
+
+class seastar::net::dns_resolver::impl
+ : public enable_shared_from_this<impl>
+{
+public:
+ impl(::network_stack& stack, const options& opts)
+ : _stack(stack)
+ , _timeout(opts.timeout ? *opts.timeout : std::chrono::milliseconds(5000) /* from ares private */)
+ , _timer(std::bind(&impl::poll_sockets, this))
+ {
+ static const ares_initializer a_init;
+
+ // this can "block" ever so slightly, because it will
+ // look in resolv.conf etc for query setup. We could
+ // do this ourselves, and instead set ares options
+ // here, but it seems more error prone (me parsing
+ // resolv.conf -> hah!)
+ ares_options a_opts = { 0, };
+ // TODO:
+ // We only allow querying dns server. We ignore hosts
+ // files. Either we need to add virtual file calls
+ // to c-ares (huger!)
+ // or we do our read+(partial)parse outselved for
+ // hosts files.
+ // Big downside here right now that we can essentially
+ // get different answers on a hsotfile based system
+ char buf[2] = "b";
+ a_opts.lookups = buf; // only net
+ // Alsways set the timeout
+ a_opts.timeout = _timeout.count();
+ int flags = ARES_OPT_LOOKUPS|ARES_OPT_TIMEOUTMS;
+
+ if (opts.use_tcp_query && *opts.use_tcp_query) {
+ a_opts.flags = ARES_FLAG_USEVC | ARES_FLAG_PRIMARY;
+ flags |= ARES_OPT_FLAGS;
+ }
+ std::vector<in_addr> addr_tmp;
+ if (opts.servers) {
+ std::transform(opts.servers->begin(), opts.servers->end(), std::back_inserter(addr_tmp), [](const inet_address& a) {
+ if (a.in_family != inet_address::family::INET) {
+ throw std::invalid_argument("Servers must be ipv4 addresses");
+ }
+ in_addr in = a;
+ return in;
+ });
+ a_opts.servers = addr_tmp.data();
+ a_opts.nservers = int(addr_tmp.size());
+ flags |= ARES_OPT_SERVERS;
+ }
+ std::vector<const char *> dom_tmp;
+ if (opts.domains) {
+ std::transform(opts.domains->begin(), opts.domains->end(), std::back_inserter(dom_tmp), [](const sstring& s) {
+ return s.data();
+ });
+ a_opts.domains = const_cast<char **>(dom_tmp.data());
+ a_opts.ndomains = int(dom_tmp.size());
+ flags |= ARES_OPT_DOMAINS;
+ }
+ if (opts.tcp_port) {
+ a_opts.tcp_port = *opts.tcp_port;
+ flags |= ARES_OPT_TCP_PORT;
+ }
+ if (opts.udp_port) {
+ a_opts.udp_port = *opts.udp_port;
+ flags |= ARES_OPT_UDP_PORT;
+ }
+
+ check_ares_error(ares_init_options(&_channel, &a_opts, flags));
+
+ static auto get_impl = [](void * p) { return reinterpret_cast<impl *>(p); };
+ static const ares_socket_functions callbacks = {
+ [](int af, int type, int protocol, void * p) { return get_impl(p)->do_socket(af, type, protocol); },
+ [](ares_socket_t s, void * p) { return get_impl(p)->do_close(s); },
+ [](ares_socket_t s, const struct sockaddr * addr, socklen_t len, void * p) { return get_impl(p)->do_connect(s, addr, len); },
+ [](ares_socket_t s, void * dst, size_t len, int flags, struct sockaddr * addr, socklen_t * alen, void * p) {
+ return get_impl(p)->do_recvfrom(s, dst, len, flags, addr, alen);
+ },
+ [](ares_socket_t s, const struct iovec * vec, int len, void * p) {
+ return get_impl(p)->do_sendv(s, vec, len);
+ },
+ };
+
+ ares_set_socket_functions(_channel, &callbacks, this);
+
+ // just in case you need printf-debug.
+ //dns_log.set_level(seastar::log_level::trace);
+ }
+ ~impl() {
+ _timer.cancel();
+ if (_channel) {
+ ares_destroy(_channel);
+ }
+ }
+
+ future<inet_address> resolve_name(sstring name, inet_address::family family) {
+ return get_host_by_name(std::move(name), family).then([](hostent h) {
+ return make_ready_future<inet_address>(h.addr_list.front());
+ });
+ }
+
+ future<hostent> get_host_by_name(sstring name, inet_address::family family) {
+ auto p = new promise<hostent>();
+ auto f = p->get_future();
+
+ dns_log.debug("Query name {} ({})", name, family);
+
+ dns_call call(*this);
+
+ ares_gethostbyname(_channel, name.c_str(), int(family), [](void* arg, int status, int timeouts, ::hostent* host) {
+ auto p = reinterpret_cast<promise<hostent> *>(arg);
+
+ switch (status) {
+ default:
+ dns_log.debug("Query failed: {}", status);
+ p->set_exception(std::system_error(status, ares_errorc));
+ break;
+ case ARES_SUCCESS:
+ p->set_value(make_hostent(*host));
+ break;
+ }
+
+ delete p;
+ }, reinterpret_cast<void *>(p));
+
+
+ poll_sockets();
+
+ return f.finally([this] {
+ end_call();
+ });
+ }
+
+ future<hostent> get_host_by_addr(inet_address addr) {
+ auto p = new promise<hostent>();
+ auto f = p->get_future();
+
+ dns_log.debug("Query addr {}", addr);
+
+ dns_call call(*this);
+
+ ares_gethostbyaddr(_channel, &
addr.in, addr.size(), int(addr.in_family), [](void* arg, int status, int timeouts, ::hostent* host) {
+ auto p = reinterpret_cast<promise<hostent> *>(arg);
+
+ switch (status) {
+ default:
+ dns_log.debug("Query failed: {}", status);
+ p->set_exception(std::system_error(status, ares_errorc));
+ break;
+ case ARES_SUCCESS:
+ p->set_value(make_hostent(*host));
+ break;
+ }
+
+ delete p;
+ }, reinterpret_cast<void *>(p));
+
+
+ poll_sockets();
+
+ return f.finally([this] {
+ end_call();
+ });
+ }
+
+ future<sstring> resolve_addr(inet_address addr) {
+ return get_host_by_addr(addr).then([](hostent h) {
+ return make_ready_future<sstring>(h.names.front());
+ });
+ }
+
+ future<> close() {
+ _closed = true;
+ ares_cancel(_channel);
+ dns_log.trace("Shutting down {} sockets", _sockets.size());
+ for (auto & p : _sockets) {
+ do_close(p.first);
+ }
+ dns_log.trace("Closing gate");
+ return _gate.close();
+ }
+private:
+ enum class type {
+ none, tcp, udp
+ };
+ struct dns_call {
+ dns_call(impl & i)
+ : _i(i)
+ , _c(++i._calls)
+ {}
+ ~dns_call() {
+ // If a query does not immediately complete
+ // it might never do so, unless data actually
+ // comes back to us and a waiting recv promise
+ // is fulfilled.
+ // We need to add a timer to do polling at ~timeout
+ // ms later, so the ares logic can detect this and
+ // tell us we're over.
+ if (_c == 1 && _i._calls != 0) {
+ _i._timer.arm_periodic(_i._timeout);
+ }
+ }
+ impl& _i;
+ uint64_t _c;
+ };
+
+ void end_call() {
+ if (--_calls == 0) {
+ _timer.cancel();
+ }
+ }
+ void poll_sockets() {
+ fd_set readers, writers;
+ int n = 0;
+
+ dns_log.trace("Poll sockets");
+
+ do {
+ // Retrieve the set of file descriptors that the library wants us to monitor.
+ FD_ZERO(&readers);
+ FD_ZERO(&writers);
+
+ n = ares_fds(_channel, &readers, &writers);
+
+ dns_log.trace("ares_fds: {}", n);
+
+ if (n == 0) {
+ break;
+ }
+
+ n = 0;
+
+ for (auto & p : _sockets) {
+ auto & e = p.second;
+ auto fd = p.first;
+ auto r = FD_ISSET(p.first, &readers);
+ auto w = FD_ISSET(p.first, &writers);
+ auto ra = e.avail & POLLIN;
+ auto wa = e.avail & POLLOUT;
+
+ dns_log.trace("fd {} {}{}/{}{}", fd, (r ? "r" : ""),
+ (w ? "w" : ""), (ra ? "r" : ""),
+ (wa ? "w" : ""));
+
+ if (!wa) {
+ FD_CLR(fd, &writers);
+ }
+ if (!ra) {
+ FD_CLR(fd, &readers);
+ }
+ if (FD_ISSET(fd, &writers) || FD_ISSET(fd, &readers)) {
+ ++n;
+ }
+ }
+
+ ares_process(_channel, &readers, &writers);
+ } while (n != 0);
+ }
+
+ static hostent make_hostent(const ::hostent& host) {
+ hostent e;
+ e.names.emplace_back(host.h_name);
+ auto np = host.h_aliases;
+ while (*np != 0) {
+ e.names.emplace_back(*np++);
+ }
+ auto p = host.h_addr_list;
+ while (*p != nullptr) {
+ switch (host.h_addrtype) {
+ case AF_INET:
+ assert(size_t(host.h_length) >= sizeof(in_addr));
+ e.addr_list.emplace_back(*reinterpret_cast<const in_addr*>(*p));
+ break;
+ case AF_INET6:
+ assert(size_t(host.h_length) >= sizeof(in6_addr));
+ e.addr_list.emplace_back(*reinterpret_cast<const in6_addr*>(*p));
+ break;
+ default:
+ break;
+ }
+ ++p;
+ }
+
+ dns_log.debug("Query success: {}/{}", e.names.front(), e.addr_list.front());
+
+ return e;
+ }
+ // We need to partially ref-count our socket entries
+ // when we have pending reads/writes, so we don't erase the
+ // entry to early.
+ void use(ares_socket_t fd) {
+ _gate.enter();
+ auto& e = _
sockets.at(fd);
+ ++e.pending;
+ }
+ void release(ares_socket_t fd) {
+ auto& e = _
sockets.at(fd);
+ dns_log.trace("Release socket {} -> {}", fd, e.pending - 1);
+ if (--e.pending < 0) {
+ _sockets.erase(fd);
+ dns_log.trace("Released socket {}", fd);
+ }
+ _gate.leave();
+ }
+ ares_socket_t do_socket(int af, int type, int protocol) {
+ if (_closed) {
+ return -1;
+ }
+ int fd = next_fd();
+ switch (type) {
+ case SOCK_STREAM:
+ _sockets.emplace(fd, connected_socket());
+ dns_log.trace("Created tcp socket {}", fd);
+ break;
+ case SOCK_DGRAM:
+ _sockets.emplace(fd, _stack.make_udp_channel());
+ dns_log.trace("Created udp socket {}", fd);
+ break;
+ default: return -1;
+ }
+ return fd;
+ }
+ void do_close(ares_socket_t fd) {
+ dns_log.trace("Close socket {}", fd);
+ auto& e = _
sockets.at(fd);
+
+ // Mark as closed.
+ if (std::exchange(e.closed, true)) {
+ return;
+ }
+
+ _gate.enter(); // "leave" is done in release(fd)
+
+ switch (e.typ) {
+ case type::tcp:
+ {
+ dns_log.trace("Close tcp socket {}, {} pending", fd, e.pending);
+ future<> f = make_ready_future();
+ if (
e.tcp.in) {
+ f = e.tcp.socket.shutdown_input().then([fd] {
+ dns_log.trace("Closed tcp socket {} input", fd);
+ });
+ }
+ if (e.tcp.out) {
+ f = f.then([&e] {
+ return e.tcp.out->close();
+ }).then([fd] {
+ dns_log.trace("Closed tcp socket {} output", fd);
+ });
+ }
+ f = f.finally([me = shared_from_this(), fd] {
+ me->release(fd);
+ });
+ break;
+ }
+ case type::udp:
+ e.udp.channel.close();
+ release(fd);
+ break;
+ default:
+ // should not happen
+ _gate.leave();
+ break;
+ }
+ }
+ socket_address sock_addr(const sockaddr * addr, socklen_t len) {
+ if (addr->sa_family != AF_INET) {
+ throw std::invalid_argument("No ipv6 yet");
+ }
+ auto in = reinterpret_cast<const sockaddr_in *>(addr);
+ return *in;
+ }
+ int do_connect(ares_socket_t fd, const sockaddr * addr, socklen_t len) {
+ if (_closed) {
+ return -1;
+ }
+ try {
+ auto& e = get_socket_entry(fd);
+ auto sa = sock_addr(addr, len);
+
+ dns_log.trace("Connect {}({})->{}", fd, int(e.typ), sa);
+
+ assert(e.avail == 0);
+
+ e.avail = POLLOUT|POLLIN; // until we know otherwise
+
+ switch (e.typ) {
+ case type::tcp: {
+ auto f = _stack.connect(sa);
+ if (!f.available()) {
+ dns_log.trace("Connection pending: {}", fd);
+ e.avail = 0;
+ use(fd);
+ f.then_wrapped([me = shared_from_this(), &e, fd](future<connected_socket> f) {
+ try {
+ e.tcp.socket = f.get0();
+ dns_log.trace("Connection complete: {}", fd);
+ } catch (...) {
+ dns_log.debug("Connect {} failed: {}", fd, std::current_exception());
+ }
+ e.avail = POLLOUT|POLLIN;
+ me->poll_sockets();
+ me->release(fd);
+ });
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ e.tcp.socket = f.get0();
+ break;
+ }
+ case type::udp:
+ // we do not have udp connect, so just keep
+ // track of the destination
+ e.udp.dst = sa;
+ break;
+ default:
+ return -1;
+ }
+ return 0;
+ } catch (...) {
+ return -1;
+ }
+ }
+ ssize_t do_recvfrom(ares_socket_t fd, void * dst, size_t len, int flags, struct sockaddr * from, socklen_t * from_len) {
+ if (_closed) {
+ return -1;
+ }
+ try {
+ auto& e = get_socket_entry(fd);
+ dns_log.trace("Read {}({})", fd, int(e.typ));
+ // check if we're already reading.
+ if (!(e.avail & POLLIN)) {
+ dns_log.trace("Read already pending {}", fd);
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ for (;;) {
+ switch (e.typ) {
+ case type::tcp: {
+ auto & tcp = e.tcp;
+ if (!tcp.indata.empty()) {
+ dns_log.trace("Read {}. {} bytes available", fd, tcp.indata.size());
+ len = std::min(len, tcp.indata.size());
+ std::copy(tcp.indata.begin(), tcp.indata.begin() + len, reinterpret_cast<char *>(dst));
+ tcp.indata.trim_front(len);
+ return len;
+ }
+ if (!
tcp.in) {
+
tcp.in = tcp.socket.input();
+ }
+ auto f = tcp.in->read_up_to(len);
+ if (!f.available()) {
+ dns_log.trace("Read {}: data unavailable", fd);
+ e.avail &= ~POLLIN;
+ use(fd);
+ f.then_wrapped([me = shared_from_this(), &e, fd](future<temporary_buffer<char>> f) {
+ try {
+ auto buf = f.get0();
+ dns_log.trace("Read {} -> {} bytes", fd, buf.size());
+ e.tcp.indata = std::move(buf);
+ } catch (...) {
+ dns_log.debug("Read {} failed: {}", fd, std::current_exception());
+ }
+ e.avail |= POLLIN; // always reset state
+ me->poll_sockets();
+ me->release(fd);
+ });
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
+ try {
+ tcp.indata = f.get0();
+ continue; // loop will take care of data
+ } catch (std::system_error& e) {
+ errno = e.code().value();
+ return -1;
+ } catch (...) {
+ }
+ return -1;
+
+ }
+ case type::udp: {
+ auto & udp = e.udp;
+ if (
udp.in) {
+ auto & p = udp.in->get_data();
+
+ dns_log.trace("Read {}. {} bytes available from {}", fd, p.len(), udp.in->get_src());
+
+ if (from != nullptr) {
+ *from = socket_address(udp.in->get_src()).as_posix_sockaddr();
+ if (from_len != nullptr) {
+ // TODO: ipvv6
+ *from_len = sizeof(sockaddr_in);
+ }
+ }
+
+ len = std::min(len, size_t(p.len()));
+ size_t rem = len;
+ auto * out = reinterpret_cast<char *>(dst);
+ for (auto & f : p.fragments()) {
+ auto n = std::min(rem, f.size);
+ out = std::copy_n(f.base, n, out);
+ rem = rem - n;
+ }
+ if (p.len() == len) {
+
udp.in = {};
+ } else {
+ p.trim_front(len);
+ }
+ return len;
+ }
+ auto f = udp.channel.receive();
+ if (!f.available()) {
+ e.avail &= ~POLLIN;
+ use(fd);
+ dns_log.trace("Read {}: data unavailable", fd);
+ f.then_wrapped([me = shared_from_this(), &e, fd](future<::net::udp_datagram> f) {
+ try {
+ auto d = f.get0();
+ dns_log.trace("Read {} -> {} bytes", fd, d.get_data().len());
+
e.udp.in = std::move(d);
+ e.avail |= POLLIN;
+ } catch (...) {
+ dns_log.debug("Read {} failed: {}", fd, std::current_exception());
+ }
+ me->poll_sockets();
+ me->release(fd);
+ });
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
+ try {
+
udp.in = std::move(f.get0());
+ continue; // loop will take care of data
+ } catch (std::system_error& e) {
+ errno = e.code().value();
+ return -1;
+ } catch (...) {
+ }
+ return -1;
+ }
+ default:
+ return -1;
+ }
+ }
+ } catch (...) {
+ }
+ return -1;
+ }
+ ssize_t do_sendv(ares_socket_t fd, const iovec * vec, int len) {
+ if (_closed) {
+ return -1;
+ }
+ try {
+ auto& e = _
sockets.at(fd);
+ dns_log.trace("Send {}({})", fd, int(e.typ));
+
+ // Assume we will be able to send data eventually very soon
+ // and just assume that unless we get immediate
+ // failures, we'll be ok. If we're not, the
+ // timeout logic will have to handle the problem.
+ //
+ // This saves us on two accounts:
+ // 1.) c-ares does not handle EWOULDBLOCK for
+ // udp sockets. Must pretend to finish
+ // immediately there anyway
+ // 2.) Doing so for tcp writes saves us having to
+ // match iovec->packet fragments. Downside is we
+ // have to copy the data, but we pretty much
+ // have to anyway, since we could otherwise
+ // get a query time out while we're sending
+ // with zero-copy and suddenly have freed
+ // memory in packets. Bad.
+
+
+ for (;;) {
+ // check if we're already writing.
+ if (e.typ == type::tcp && !(e.avail & POLLOUT)) {
+ dns_log.trace("Send already pending {}", fd);
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
+ ::net::packet p;
+ p.reserve(len);
+ for (int i = 0; i < len; ++i) {
+ p = ::net::packet(std::move(p), ::net::fragment{reinterpret_cast<char *>(vec[i].iov_base), vec[i].iov_len});
+ }
+
+ auto bytes = p.len();
+ auto f = make_ready_future();
+
+ switch (e.typ) {
+ case type::tcp:
+ if (!e.tcp.out) {
+ e.tcp.out = e.tcp.socket.output();
+ }
+ f = e.tcp.out->write(std::move(p));
+ break;
+ case type::udp:
+ f = e.udp.channel.send(e.udp.dst, std::move(p));
+ break;
+ default:
+ return -1;
+ }
+
+ if (!f.available()) {
+ dns_log.trace("Send {} unavailable.", fd);
+ e.avail &= ~POLLOUT;
+ use(fd);
+ f.then_wrapped([me = shared_from_this(), &e, bytes, fd](future<> f) {
+ try {
+ f.get();
+ dns_log.trace("Send {}. {} bytes sent.", fd, bytes);
+ } catch (...) {
+ dns_log.debug("Send {} failed: {}", fd, std::current_exception());
+ }
+ e.avail |= POLLOUT;
+ me->poll_sockets();
+ me->release(fd);
+ });
+ // c-ares does _not_ use non-blocking retry for udp sockets. We just pretend
+ // all is fine even though we have no idea. Barring stack/adapter failure it
+ // is close to the same guarantee a "normal" message send would have anyway.
+ // For tcp we also pretend we're done, to make sure we don't have to deal with
+ // matching sent data
+ }
+ if (f.failed()) {
+ try {
+ f.get();
+ } catch (std::system_error& e) {
+ errno = e.code().value();
+ } catch (...) {
+ }
+ return -1;
+ }
+
+ return len;
+ }
+ } catch (...) {
+ }
+ return -1;
+ }
+ int next_fd() {
+ int fd = int(_sockets.size() + 1);
+ while (_sockets.count(fd)) {
+ ++fd;
+ }
+ return fd;
+ }
+ struct tcp_entry {
+ tcp_entry(connected_socket s)
+ : socket(std::move(s)) {
+ }
+ ;
+ connected_socket socket;
+ std::experimental::optional<input_stream<char>> in;
+ std::experimental::optional<output_stream<char>> out;
+ temporary_buffer<char> indata;
+ };
+ struct udp_entry {
+ udp_entry(::net::udp_channel c)
+ : channel(std::move(c)) {
+ }
+ ::net::udp_channel channel;
+ std::experimental::optional<::net::udp_datagram> in;;
+ socket_address dst;
+ };
+ struct sock_entry {
+ union {
+ tcp_entry tcp;
+ udp_entry udp;
+ };
+ type typ;
+ int avail = 0;
+ int pending = 0;
+ bool closed = false;
+
+ sock_entry(sock_entry&& e)
+ : typ(e.typ)
+ , avail(e.avail)
+ {
+ e.typ = type::none;
+ switch (typ) {
+ case type::tcp:
+ tcp = std::move(e.tcp);
+ break;
+ case type::udp:
+ udp = std::move(e.udp);
+ break;
+ default:
+ break;
+ }
+ }
+ sock_entry(connected_socket s)
+ : tcp(tcp_entry{std::move(s)})
+ , typ(type::tcp)
+ {}
+ sock_entry(::net::udp_channel c)
+ : udp(udp_entry{std::move(c)})
+ , typ(type::udp)
+ {}
+ ~sock_entry() {
+ switch (typ) {
+ case type::tcp: tcp.~tcp_entry(); break;
+ case type::udp: udp.~udp_entry(); break;
+ default: break;
+ }
+ }
+ };
+
+ sock_entry& get_socket_entry(ares_socket_t fd) {
+ auto& e = _
sockets.at(fd);
+ if (e.closed) {
+ throw std::runtime_error("Socket closed");
+ }
+ return e;
+ }
+
+
+ typedef std::unordered_map<ares_socket_t, sock_entry> socket_map;
+
+ friend struct dns_call;
+
+ socket_map _sockets;
+ ::network_stack & _stack;
+
+ ares_channel _channel = {};
+ uint64_t _ops = 0, _calls = 0;
+ std::chrono::milliseconds _timeout;
+ timer<> _timer;
+ gate _gate;
+ bool _closed = false;
+};
+
+seastar::net::dns_resolver::dns_resolver()
+ : dns_resolver(options())
+{}
+
+seastar::net::dns_resolver::dns_resolver(const options& opts)
+ : dns_resolver(engine().net(), opts)
+{}
+
+seastar::net::dns_resolver::dns_resolver(network_stack& stack, const options& opts)
+ : _impl(::make_shared<impl>(stack, opts))
+{}
+
+seastar::net::dns_resolver::~dns_resolver()
+{}
+
+seastar::net::dns_resolver::dns_resolver(dns_resolver&&) = default;
+seastar::net::dns_resolver& seastar::net::dns_resolver::operator=(dns_resolver&&) = default;
+
+future<seastar::net::hostent> seastar::net::dns_resolver::get_host_by_name(const sstring& name, opt_family family) {
+ return _impl->get_host_by_name(name, family.value_or(inet_address::family::INET));
+}
+
+future<seastar::net::hostent> seastar::net::dns_resolver::get_host_by_addr(const inet_address& addr) {
+ return _impl->get_host_by_addr(addr);
+}
+
+future<seastar::net::inet_address> seastar::net::dns_resolver::resolve_name(const sstring& name, opt_family family) {
+ return _impl->resolve_name(name, family.value_or(inet_address::family::INET));
+}
+
+future<sstring> seastar::net::dns_resolver::resolve_addr(const inet_address& addr) {
+ return _impl->resolve_addr(addr);
+}
+
+future<> seastar::net::dns_resolver::close() {
+ return _impl->close();
+}
+
+static seastar::net::dns_resolver& resolver() {
+ static thread_local seastar::net::dns_resolver resolver;
+ return resolver;
+}
+
+
+future<seastar::net::hostent> seastar::net::dns::get_host_by_name(const sstring& name, opt_family family) {
+ return resolver().get_host_by_name(name, family.value_or(inet_address::family::INET));
+}
+
+future<seastar::net::hostent> seastar::net::dns::get_host_by_addr(const inet_address& addr) {
+ return resolver().get_host_by_addr(addr);
+}
+
+future<seastar::net::inet_address> seastar::net::dns::resolve_name(const sstring& name, opt_family family) {
+ return resolver().resolve_name(name, family.value_or(inet_address::family::INET));
+}
+
+future<sstring> seastar::net::dns::resolve_addr(const inet_address& addr) {
+ return resolver().resolve_addr(addr);
+}
diff --git a/tests/dns_test.cc b/tests/dns_test.cc
new file mode 100644
index 0000000..2999d0e
--- /dev/null
+++ b/tests/dns_test.cc
@@ -0,0 +1,105 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Copyright (C) 2016 ScyllaDB.
+ */
+#include <vector>
+#include <algorithm>
+
+#include "core/do_with.hh"
+#include "test-utils.hh"
+#include "core/sstring.hh"
+#include "core/reactor.hh"
+#include "core/do_with.hh"
+#include "core/future-util.hh"
+#include "net/dns.hh"
+#include "net/inet_address.hh"
+
+using namespace seastar;
+using namespace seastar::net;
+
+static const inet_address google_addr = inet_address("216.58.201.164");
+static const sstring google_name = "
www.google.com";
+
+static future<> test_resolve(dns_resolver::options opts) {
+ auto d = ::make_lw_shared<dns_resolver>(std::move(opts));
+ return d->get_host_by_name(google_name, inet_address::family::INET).then([d](hostent e) {
+ //BOOST_REQUIRE(std::count(e.addr_list.begin(), e.addr_list.end(), google_addr));
+ return d->get_host_by_addr(e.addr_list.front()).then([d, a = e.addr_list.front()](hostent e) {
+ return d->get_host_by_name(e.names.front(), inet_address::family::INET).then([a](hostent e) {
+ BOOST_REQUIRE(std::count(e.addr_list.begin(), e.addr_list.end(), a));
+ });
+ });
+ }).finally([d]{
+ return d->close();
+ });
+}
+
+static future<> test_bad_name(dns_resolver::options opts) {
+ auto d = ::make_lw_shared<dns_resolver>(std::move(opts));
+ return d->get_host_by_name("apa.ninja.gnu", inet_address::family::INET).then_wrapped([d](future<hostent> f) {
+ try {
+ f.get();
+ BOOST_FAIL("should not succeed");
+ } catch (...) {
+ // ok.
+ }
+ }).finally([d]{
+ return d->close();
+ });
+}
+
+SEASTAR_TEST_CASE(test_resolve_udp) {
+ return test_resolve(dns_resolver::options());
+}
+
+SEASTAR_TEST_CASE(test_bad_name_udp) {
+ return test_bad_name(dns_resolver::options());
+}
+
+SEASTAR_TEST_CASE(test_timeout_udp) {
+ dns_resolver::options opts;
+ opts.servers = std::vector<inet_address>({ inet_address("1.2.3.4") }); // not a server
+ opts.timeout = std::chrono::milliseconds(500);
+
+ auto d = ::make_lw_shared<dns_resolver>(engine().net(), opts);
+ return d->get_host_by_name(google_name, inet_address::family::INET).then_wrapped([d](future<hostent> f) {
+ try {
+ f.get();
+ BOOST_FAIL("should not succeed");
+ } catch (...) {
+ // ok.
+ }
+ }).finally([d]{
+ return d->close();
+ });
+}
+
+SEASTAR_TEST_CASE(test_resolve_tcp) {
+ dns_resolver::options opts;
+ opts.use_tcp_query = true;
+ return test_resolve(opts);
+}
+
+SEASTAR_TEST_CASE(test_bad_name_tcp) {
+ dns_resolver::options opts;
+ opts.use_tcp_query = true;
+ return test_bad_name(opts);
+}
+
--
1.9.1