Example of a standalone Vertx http client (that uses long polling)

1,059 views
Skip to first unread message

Rick Hight

unread,
Oct 9, 2014, 4:18:04 AM10/9/14
to ve...@googlegroups.com
I wrote a little HTTP client for etcd using vertx.

If you wanted an example of how to use vertx http client, it might not be a
bad one.

https://github.com/boonproject/boon/blob/master/etcd/README.md

*Boon etcd is a Java client for etcd.*

*What is etcd?*

etcd is a highly-available key value store for shared configuration and
service discovery.

etcd is part of the coreos project.

etcd is inspired by Apache ZooKeeper and doozer, with a focus on being:

Simple: REST like and curl'able user facing API (HTTP+JSON); Secure:
optional SSL client cert authentication; Fast: benchmarked 1000s of
writes/s per instance; Reliable: properly distributed using Raft.

etcd is written in Go and uses the Raft consensus algorithm to manage a
highly-available replicated log.

You can learn more about etcd at https://github.com/coreos/etcd.

*Boon etcd client for Java*

Unlike most etcd Java clients (perhaps all), it supports wait, which is it
allows you to wait on a key or key directory changing. Also unlike most
etcd Java clients, it supports both async and sync mode. Like all boon
projects, it is easy to use, and fast. :)

Rather then give you a very thin (hard to use) API that merely allows you
to pass the various flags and switches, we create a task oriented Java API
so all of the most common task are at your finger tips.

There is an async version and a synchronous version of each method for your
convienience.

interface Etcd {


/** * Create a directory using async handler * @param responseHandler handler * @param name name of dir */
void createDir(Handler<Response> responseHandler, String name);

/** * Create a directory (blocking) * @param name name of dir * @return response */
Response createDir(String name);


/** * Create a temp directory, i.e., one with a time to live TTL * @param name name of dir * @param ttl ttl * @return */
Response createTempDir(String name, long ttl);

/** * Create a temp dir async. * @param responseHandler async handler * @param name name of dir * @param ttl time to live */
void createTempDir(Handler<Response> responseHandler, String name, long ttl);

/** * Update a directories time to live. * @param name dir name (path) * @param ttl ttl * @return */
Response updateDirTTL(String name, long ttl);

/** * Update a directories time to live. * @param responseHandler * @param name * @param ttl */
void updateDirTTL(Handler<Response> responseHandler, String name, long ttl);

/** * Delete a dir * @param name * @return */
Response deleteDir(String name);

/** * Delete a dir async. * @param responseHandler * @param name */
void deleteDir(Handler<Response> responseHandler, String name);


/** * Delete a dir and all of its children recursively. * @param name * @return */
Response deleteDirRecursively(String name);
void deleteDirRecursively(Handler<Response> responseHandler, String name);


/** * List keys and value * @param key * @return */
Response list(String key);


/** * List keys and values asycn * @param responseHandler * @param key */
void list(Handler<Response> responseHandler, String key);

/** * List dir recursively. * @param key * @return */
Response listRecursive(String key);
void listRecursive(Handler<Response> responseHandler, String key);

/** * List dir sorted for order so we can pull things out FIFO for job queuing. * @param key * @return */
Response listSorted(String key);
void listSorted(Handler<Response> responseHandler, String key);


/** * Set a key * @param key * @param value * @return */
Response set(String key, String value);
void set(Handler<Response> responseHandler, String key, String value);

/** * Add a config under this key * @param key * @param fileName * @return */
Response setConfigFile(String key, String fileName);
void setConfigFile(Handler<Response> responseHandler, String key, String fileName);

/** * Update the key with a new value if it already exists * @param key * @param value * @return */
Response setIfExists(String key, String value);
void setIfExists(Handler<Response> responseHandler, String key, String value);


/** * Create the new key value only if it does not already exist. * @param key * @param value * @return */
Response setIfNotExists(String key, String value);
void setIfNotExists(Handler<Response> responseHandler, String key, String value);

/** * Create a temporary value with ttl set * @param key * @param value * @param ttl * @return */
Response setTemp(String key, String value, int ttl);
void setTemp(Handler<Response> responseHandler, String key, String value, int ttl);

/** * Remove TTL from key/value * @param key * @param value * @return */
Response removeTTL(String key, String value);
void removeTTL(Handler<Response> responseHandler, String key, String value);


/** * Compare and swap if the previous value is the same * @param key * @param preValue * @param value * @return */
Response compareAndSwapByValue(String key, String preValue, String value);
void compareAndSwapByValue(Handler<Response> responseHandler, String key, String preValue, String value);

/** * Compare and swap if the modified index has not changed. * @param key * @param prevIndex * @param value * @return */
Response compareAndSwapByModifiedIndex(String key, long prevIndex, String value);
void compareAndSwapByModifiedIndex(Handler<Response> responseHandler, String key, long prevIndex, String value);


/** * Get the value * @param key * @return */
Response get(String key);
void get(Handler<Response> responseHandler, String key);


/** * Get the value and ensure it is consistent. (Slow but consistent) * @param key * @return */
Response getConsistent(String key);
void getConsistent(Handler<Response> responseHandler, String key);

/** * Wait for this key to change * @param key * @return */
Response wait(String key);
void wait(Handler<Response> responseHandler, String key);


/** * Wait for this key to change and you can ask for the past key value based on index just in case you missed it. * @param key * @param index * @return */
Response wait(String key, long index);
void wait(Handler<Response> responseHandler, String key, long index);


/** * Wait for this key to change and any key under this key dir recursively. * @param key * @return */
Response waitRecursive(String key);
void waitRecursive(Handler<Response> responseHandler, String key);


/** * Wait for this key to change and any key under this key dir recursively, and * ask for the past key value based on index just in case you missed it. * @param key * @param index * @return */
Response waitRecursive(String key, long index);
void waitRecursive(Handler<Response> responseHandler, String key, long index);

/** * Delete the key. * @param key * @return */
Response delete(String key);
void delete(Handler<Response> responseHandler, String key);

/** Delete the key only if it is at this index * * @param key * @param index * @return */
Response deleteIfAtIndex(String key, long index);
void deleteIfAtIndex(Handler<Response> responseHandler, String key, long index);

/** * Delete the value but only if it is at the previous value * @param key * @param prevValue * @return */
Response deleteIfValue(String key, String prevValue);
void deleteIfValue(Handler<Response> responseHandler, String key, String prevValue);
}

As you can see, the interface tries to spell out all of the main etcd
operations form the etcd tutorial. If we are missing any, let us know.

You can use boon etcd client synchronously as follows:

Response response;

EtcdClient client = new EtcdClient("localhost", 4001);
response = client.get("foo");

puts(response);

response = client.set("foo", "Rick Was here");

puts(response);


response = client.get("foo");

puts(response);


response = client.delete("foo");


puts(response);


client.setTemp("tempKey", "tempValue", 5);

puts(client.get("tempKey").node().getValue());

Sys.sleep(1000);


puts(client.get("tempKey").node().getValue());

Sys.sleep(1000);


puts(client.get("tempKey").node().getValue());

Sys.sleep(4000);


puts(client.get("tempKey"));


Response waitOnKey = client.wait("waitOnKey");

puts("GOT KEY WE ARE WAITING ONE", waitOnKey);

puts("Create a dir");

client.createDir("conf");


client.createDir("conf/foo1");

client.createDir("conf/foo2");


client.createDir("conf/foo3");


response = client.listRecursive("");

puts(response);


response = client.deleteDir("conf");


puts(response);

response = client.deleteDirRecursively("conf");


puts(response);


response = client.listRecursive("");

puts(response);

response = client.createDir("queue");
puts(response);

response = client.createDir("queue/job1");
puts(response);


response = client.set("queue/job1/mom", "mom");
puts(response);

response = client.createDir("queue/job29");
puts(response);


response = client.createDir("queue/job3");
puts(response);


response = client.listSorted("queue");
puts(response);

Or you can use it asynchronously as follows:

Handler<Response> handler = new Handler<Response>() {
@Override
public void handle(Response event) {

if (event.node() != null) {
puts(event.action(), event.node().key(), event);
} else {
puts(event);
}
}
};

EtcdClient client = new EtcdClient("localhost", 4001);
client.get("foo");


client.set(handler, "foo", "Rick Was here");

Sys.sleep(1_000);


client.get(handler, "foo");

Sys.sleep(1_000);


client.delete(handler, "foo");

Sys.sleep(1_000);

client.setTemp(handler, "tempKey", "tempValue", 5);

Sys.sleep(1_000);

client.get(handler, "tempKey");

Sys.sleep(1000);


client.get(handler, "tempKey");

Sys.sleep(1000);


client.get(handler, "tempKey");

Sys.sleep(4000);


client.get(handler, "tempKey");


Sys.sleep(1000);

client.get(handler, "tempKey");

Sys.sleep(1000);


puts("WAITING ON KEY");

client.wait(handler, "waitOnKey");

Sys.sleep(10_000);

client.createDir(handler, "conf");

Sys.sleep(1000);

client.createDir(handler, "conf/foo1");
client.createDir(handler, "conf/foo2");
client.createDir(handler, "conf/foo3");

puts ("LIST RECURSIVE");
client.listRecursive(handler, "");


Sys.sleep(3_000);

client.deleteDir(handler, "conf");

Sys.sleep(1_000);


client.deleteDirRecursively(handler, "conf");
Sys.sleep(1_000);


client.listRecursive(handler, "");

Sys.sleep(1_000);

client.createDir(handler, "queue");
Sys.sleep(1_000);


client.createDir(handler, "queue");
Sys.sleep(1_000);


client.createDir(handler, "queue/job1");
Sys.sleep(1_000);


client.set(handler, "queue/job1/mom", "mom");
Sys.sleep(1_000);

client.createDir(handler, "queue/job29");
Sys.sleep(1_000);


client.createDir(handler, "queue/job3");
Sys.sleep(1_000);


client.listSorted(handler, "queue");
Sys.sleep(1_000);

/** * Created by rhightower on 10/8/14. */public class EtcdClient implements
Etcd{ /** Vertx which is the websocket lib we use. */ private final Vertx
vertx; private HttpClient httpClient; /** Are we closed.*/ private volatile
boolean closed; /** Host to connect to. */ private final String host; /**
Port of host to connect to. */ private final int port; private int timeout =
5000; private ThreadLocal<JsonParserAndMapper>
jsonParserAndMapperThreadLocal = new ThreadLocal<JsonParserAndMapper>(){
@Override protected JsonParserAndMapper initialValue() { return new
JsonParserFactory().create(); } }; public EtcdClient(Vertx vertx, String
host, int port) { this.vertx = vertx==null ? VertxFactory.newVertx() : vertx
; this.host = host; this.port = port; connect(); } public EtcdClient(Vertx
vertx, String host, int port, int milliSecondTimeout) { this.vertx = vertx==
null ? VertxFactory.newVertx() : vertx; this.host = host; this.port = port;
this.timeout = milliSecondTimeout; connect(); } public EtcdClient(String
host, int port) { this(null, host, port); } @Override public Response delete
(String key) { final BlockingQueue<Response> responseBlockingQueue = new
ArrayBlockingQueue<>(1); HttpClientRequest httpClientRequest = httpClient.
delete(Str.add("/v2/keys/", key), getResponseHandler("delete", key,
responseBlockingQueue)); httpClientRequest.end(); return getResponse(key,
responseBlockingQueue); } @Override public void delete(org.boon.core.Handler
<Response> responseHandler, String key) { HttpClientRequest
httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key),
handleResponse("delete", key, responseHandler)); httpClientRequest.end(); }
@Override public Response deleteDir(String key) { final BlockingQueue<
Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/",
key, "?dir=true"), getResponseHandler("delete", key, responseBlockingQueue
)); httpClientRequest.end(); return getResponse(key, responseBlockingQueue);
} @Override public void deleteDir(org.boon.core.Handler<Response>
responseHandler, String name) { } @Override public Response
deleteDirRecursively(String key) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key,
"?dir=true&recursive=true"), getResponseHandler("delete", key,
responseBlockingQueue)); httpClientRequest.end(); return getResponse(key,
responseBlockingQueue); } @Override public void deleteDirRecursively(org.
boon.core.Handler<Response> responseHandler, String key) {
HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/",
key, "?dir=true&recursive=true"), handleResponse("delete", key,
responseHandler)); httpClientRequest.end(); } @Override public Response
deleteIfAtIndex(String key, long index) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key),
getResponseHandler("delete", key, responseBlockingQueue)); Buffer buffer =
new Buffer(20); addField(buffer, "prevIndex", index); httpClientRequest.end(
buffer); return getResponse(key, responseBlockingQueue); } @Override public
void deleteIfAtIndex(org.boon.core.Handler<Response> responseHandler, String
key, long index) { HttpClientRequest httpClientRequest = httpClient.delete(
Str.add("/v2/keys/", key), handleResponse("delete", key, responseHandler));
Buffer buffer = new Buffer(20); addField(buffer, "prevIndex", index);
httpClientRequest.end(buffer); } @Override public Response deleteIfValue(
String key, String prevValue) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key),
getResponseHandler("delete", key, responseBlockingQueue)); Buffer buffer =
new Buffer(20); addField(buffer, "prevValue", prevValue); httpClientRequest.
end(buffer); return getResponse(key, responseBlockingQueue); } @Override
public void deleteIfValue(org.boon.core.Handler<Response> responseHandler,
String key, String prevValue) { HttpClientRequest httpClientRequest =
httpClient.delete(Str.add("/v2/keys/", key), handleResponse("delete", key,
responseHandler)); Buffer buffer = new Buffer(20); addField(buffer,
"prevValue", prevValue); httpClientRequest.end(buffer); } @Override public
void createDir(org.boon.core.Handler<Response> responseHandler, String key)
{ HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/",
key), handleResponse("set", key, responseHandler)); Buffer buffer = new
Buffer(key.length()); buffer.appendString("dir=true"); sendToEtcd(
httpClientRequest, buffer); } @Override public Response createDir(String
key) { final BlockingQueue<Response> responseBlockingQueue = new
ArrayBlockingQueue<>(1); HttpClientRequest httpClientRequest = httpClient.
put(Str.add("/v2/keys/", key), getResponseHandler("set", key,
responseBlockingQueue)); Buffer buffer = new Buffer(key.length()); buffer.
appendString("dir=true"); sendToEtcd(httpClientRequest, buffer); return
getResponse(key, responseBlockingQueue); } @Override public Response
createTempDir(String key, long ttl) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
getResponseHandler("set", key, responseBlockingQueue)); Buffer buffer = new
Buffer(key.length()); buffer.appendString("dir=true&ttl=" + ttl); sendToEtcd
(httpClientRequest, buffer); return getResponse(key, responseBlockingQueue
); } @Override public void createTempDir(org.boon.core.Handler<Response>
responseHandler, String key, long ttl) { HttpClientRequest
httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), handleResponse
("set", key, responseHandler)); Buffer buffer = new Buffer(key.length());
buffer.appendString("dir=true&ttl=" + ttl); sendToEtcd(httpClientRequest,
buffer); } @Override public Response updateDirTTL(String key, long ttl) {
final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue
<>(1); HttpClientRequest httpClientRequest = httpClient.put(Str.add(
"/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
Buffer buffer = new Buffer(key.length()); buffer.appendString(
"prevExist=true&dir=true&ttl=" + ttl); sendToEtcd(httpClientRequest, buffer
); return getResponse(key, responseBlockingQueue); } @Override public void
updateDirTTL(org.boon.core.Handler<Response> responseHandler, String name,
long ttl) { HttpClientRequest httpClientRequest = httpClient.put(Str.add(
"/v2/keys/", name), handleResponse("set", name, responseHandler)); Buffer
buffer = new Buffer(name.length()); buffer.appendString(
"prevExist=true&dir=true&ttl=" + ttl); sendToEtcd(httpClientRequest, buffer
); } @Override public Response list(String key) { return get(key); }
@Override public void list(org.boon.core.Handler<Response> responseHandler,
String key) { get(responseHandler, key); } @Override public Response
listRecursive(String key) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.get(Str.add("/v2/keys/", key,
"?recursive=true"), getResponseHandler("get", key, responseBlockingQueue));
httpClientRequest.end(); return getResponse(key, responseBlockingQueue); }
@Override public void listRecursive(org.boon.core.Handler<Response>
responseHandler, String key) { HttpClientRequest httpClientRequest =
httpClient.get(Str.add("/v2/keys/", key, "?recursive=true"), handleResponse(
"get", key, responseHandler)); httpClientRequest.end(); } @Override public
Response listSorted(String key) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.get(Str.add("/v2/keys/", key,
"?recursive=true&sorted=true"), getResponseHandler("get", key,
responseBlockingQueue)); httpClientRequest.end(); return getResponse(key,
responseBlockingQueue); } @Override public void listSorted(org.boon.core.
Handler<Response> responseHandler, String key) { HttpClientRequest
httpClientRequest = httpClient.get(Str.add("/v2/keys/", key,
"?recursive=true&sorted=true"), handleResponse("get", key, responseHandler
)); httpClientRequest.end(); } @Override public Response addToDir(String
dirName, String key, String value) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); String uri = Str.add(
"/v2/keys/", dirName, "/", key); HttpClientRequest httpClientRequest =
httpClient.post(uri, getResponseHandler("set", key, responseBlockingQueue));
Buffer buffer = new Buffer(value.length()); addField(buffer, "value", value
); sendToEtcd(httpClientRequest, buffer); return getResponse(key,
responseBlockingQueue); } @Override public void addToDir(org.boon.core.
Handler<Response> responseHandler, String dirName, String key, String value)
{ String uri = Str.add("/v2/keys/", dirName, "/", key); HttpClientRequest
httpClientRequest = httpClient.post(uri, handleResponse("set", key,
responseHandler)); Buffer buffer = new Buffer(value.length()); addField(
buffer, "value", value); sendToEtcd(httpClientRequest, buffer); } public
Response set(String key, String value) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
getResponseHandler("set", key, responseBlockingQueue)); Buffer buffer = new
Buffer(value.length()); addField(buffer, "value", value); sendToEtcd(
httpClientRequest, buffer); return getResponse(key, responseBlockingQueue);
} @Override public void set(org.boon.core.Handler<Response> responseHandler,
String key, String value) { HttpClientRequest httpClientRequest =
httpClient.put(Str.add("/v2/keys/", key), handleResponse("set", key,
responseHandler)); Buffer buffer = new Buffer(value.length()); addField(
buffer, "value", value); sendToEtcd(httpClientRequest, buffer); } @Override
public Response setConfigFile(String key, String fileName) { if (!IO.exists(
fileName)) { die("setConfigFile", "file name does not exist", fileName); }
return this.set(key, IO.read(fileName)); } @Override public void
setConfigFile(org.boon.core.Handler<Response> responseHandler, String key,
String fileName) { if (!IO.exists(fileName)) { die("setConfigFile", "file
name does not exist", fileName); } this.set(responseHandler, key, IO.read(
fileName)); } @Override public Response setIfExists(String key, String value
) { final BlockingQueue<Response> responseBlockingQueue = new
ArrayBlockingQueue<>(1); HttpClientRequest httpClientRequest = httpClient.
put(Str.add("/v2/keys/", key), getResponseHandler("set", key,
responseBlockingQueue)); Buffer buffer = new Buffer(value.length() + 50);
buffer.appendString("prevExist=true"); addField(buffer, "&value", value);
sendToEtcd(httpClientRequest, buffer); return getResponse(key,
responseBlockingQueue); } @Override public void setIfExists(org.boon.core.
Handler<Response> responseHandler, String key, String value) {
HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/",
key), handleResponse("set", key, responseHandler)); Buffer buffer = new
Buffer(value.length() + 50); buffer.appendString("prevExist=true"); addField
(buffer, "&value", value); sendToEtcd(httpClientRequest, buffer); }
@Override public Response setIfNotExists(String key, String value) { final
BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/",
key), getResponseHandler("set", key, responseBlockingQueue)); Buffer buffer
= new Buffer(value.length() + 50); buffer.appendString("prevExist=false");
addField(buffer, "&value", value); sendToEtcd(httpClientRequest, buffer);
return getResponse(key, responseBlockingQueue); } @Override public void
setIfNotExists(org.boon.core.Handler<Response> responseHandler, String key,
String value) { HttpClientRequest httpClientRequest = httpClient.put(Str.add
("/v2/keys/", key), handleResponse("set", key, responseHandler)); Buffer
buffer = new Buffer(value.length() + 50); buffer.appendString(
"prevExist=false"); addField(buffer, "&value", value); sendToEtcd(
httpClientRequest, buffer); } @Override public Response
compareAndSwapByValue(String key, String prevValue, String value) { final
BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/",
key), getResponseHandler("set", key, responseBlockingQueue)); Buffer buffer
= new Buffer(50 + prevValue.length() + value.length()); addField(buffer,
"prevValue", prevValue); buffer.appendString("&value", value); sendToEtcd(
httpClientRequest, buffer); return getResponse(key, responseBlockingQueue);
} @Override public void compareAndSwapByValue(org.boon.core.Handler<Response
> responseHandler, String key, String prevValue, String value) {
HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/",
key), handleResponse("set", key, responseHandler)); Buffer buffer = new
Buffer(50 + prevValue.length() + value.length()); addField(buffer,
"prevValue", prevValue); buffer.appendString("&value", value); sendToEtcd(
httpClientRequest, buffer); } @Override public Response
compareAndSwapByModifiedIndex(String key, long prevIndex, String value) {
final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue
<>(1); HttpClientRequest httpClientRequest = httpClient.put(Str.add(
"/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
Buffer buffer = new Buffer(20 + value.length()); addField(buffer,
"prevIndex", prevIndex); buffer.appendString("&value", value); sendToEtcd(
httpClientRequest, buffer); return getResponse(key, responseBlockingQueue);
} @Override public void compareAndSwapByModifiedIndex(org.boon.core.Handler<
Response> responseHandler, String key, long prevIndex, String value) {
HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/",
key), handleResponse("set", key, responseHandler)); Buffer buffer = new
Buffer(20 + value.length()); addField(buffer, "prevIndex", prevIndex);
buffer.appendString("&value", value); sendToEtcd(httpClientRequest, buffer
); } private void addField(Buffer buffer, String fieldName, String value) {
try { buffer.appendString(fieldName).appendString("=").appendString(
URLEncoder.encode(value, StandardCharsets.UTF_8.displayName())); } catch (
UnsupportedEncodingException e) { Exceptions.handle(e); } } private void
addField(Buffer buffer, String fieldName, long value) { buffer.appendString(
fieldName).appendString("=").appendString(Long.toString(value)); } private
void addField(Buffer buffer, String fieldName, boolean value) { buffer.
appendString(fieldName).appendString("=").appendString(Boolean.toString(
value)); } @Override public Response setTemp(String key, String value, int
ttl) { final BlockingQueue<Response> responseBlockingQueue = new
ArrayBlockingQueue<>(1); HttpClientRequest httpClientRequest = httpClient.
put(Str.add("/v2/keys/", key), getResponseHandler("set", key,
responseBlockingQueue)); Buffer buffer = new Buffer(value.length());
addField(buffer, "value", value); buffer.appendString("&ttl=").appendString(
Integer.toString(ttl)); sendToEtcd(httpClientRequest, buffer); return
getResponse(key, responseBlockingQueue); } @Override public void setTemp(org
.boon.core.Handler<Response> responseHandler, String key, String value, int
ttl) { HttpClientRequest httpClientRequest = httpClient.put(Str.add(
"/v2/keys/", key), handleResponse("set", key, responseHandler)); Buffer
buffer = new Buffer(value.length()); addField(buffer, "value", value);
buffer.appendString("&ttl=").appendString(Integer.toString(ttl)); sendToEtcd
(httpClientRequest, buffer); } @Override public Response removeTTL(String
key, String value) { final BlockingQueue<Response> responseBlockingQueue =
new ArrayBlockingQueue<>(1); HttpClientRequest httpClientRequest =
httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key,
responseBlockingQueue)); Buffer buffer = new Buffer(value.length());
addField(buffer, "value", value); buffer.appendString("&ttl=&"); addField(
buffer, "prevExist", true); sendToEtcd(httpClientRequest, buffer); return
getResponse(key, responseBlockingQueue); } @Override public void removeTTL(
org.boon.core.Handler<Response> responseHandler, String key, String value) {
HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/",
key), handleResponse("set", key, responseHandler)); Buffer buffer = new
Buffer(value.length()); addField(buffer, "value", value); buffer.
appendString("&ttl=&"); addField(buffer, "prevExist", true); sendToEtcd(
httpClientRequest, buffer); } private void sendToEtcd(HttpClientRequest
httpClientRequest, Buffer buffer) { httpClientRequest.putHeader(
"Content-Type", "application/x-www-form-urlencoded").end(buffer); }
@Override public Response get(String key) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.get(Str.add("/v2/keys/", key),
getResponseHandler("get", key, responseBlockingQueue)); httpClientRequest.
end(); return getResponse(key, responseBlockingQueue); } @Override public
void get(org.boon.core.Handler<Response> responseHandler, String key) {
HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/",
key), handleResponse("get", key, responseHandler)); httpClientRequest.end();
} @Override public Response getConsistent(String key) { final BlockingQueue
<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/",
key, "?consistent=true"), getResponseHandler("get", key,
responseBlockingQueue)); httpClientRequest.end(); return getResponse(key,
responseBlockingQueue); } @Override public void getConsistent(org.boon.core.
Handler<Response> responseHandler, String key) { HttpClientRequest
httpClientRequest = httpClient.get(Str.add("/v2/keys/", key,
"?consistent=true"), handleResponse("get", key, responseHandler));
httpClientRequest.end(); } @Override public Response wait(String key) {
final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue
<>(1); HttpClientRequest httpClientRequest = httpClient.get(Str.add(
"/v2/keys/", key, "?wait=true"), getResponseHandler("get", key,
responseBlockingQueue)); httpClientRequest.end(); return
getResponseWaitForever(key, responseBlockingQueue); } @Override public void
wait(org.boon.core.Handler<Response> responseHandler, String key) {
HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/",
key, "?wait=true"), handleResponse("get", key, responseHandler));
httpClientRequest.end(); } @Override public Response wait(String key, long
index) { final BlockingQueue<Response> responseBlockingQueue = new
ArrayBlockingQueue<>(1); HttpClientRequest httpClientRequest = httpClient.
get(Str.add("/v2/keys/", key, "?wait=true&waitIndex=", Long.toString(index
)), getResponseHandler("get", key, responseBlockingQueue));
httpClientRequest.end(); return getResponseWaitForever(key,
responseBlockingQueue); } @Override public void wait(org.boon.core.Handler<
Response> responseHandler, String key, long index) { HttpClientRequest
httpClientRequest = httpClient.get( Str.add("/v2/keys/", key,
"?wait=true&waitIndex=", Long.toString(index)), handleResponse("get", key,
responseHandler)); httpClientRequest.end(); } @Override public Response
waitRecursive(String key) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.get( Str.add("/v2/keys/", key,
"?wait=true&recursive=true"), getResponseHandler("get", key,
responseBlockingQueue)); httpClientRequest.end(); return
getResponseWaitForever(key, responseBlockingQueue); } @Override public void
waitRecursive(org.boon.core.Handler<Response> responseHandler, String key) {
HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/",
key, "?wait=true&recursive=true"), handleResponse("get", key,
responseHandler)); httpClientRequest.end(); } @Override public Response
waitRecursive(String key, long index) { final BlockingQueue<Response>
responseBlockingQueue = new ArrayBlockingQueue<>(1); HttpClientRequest
httpClientRequest = httpClient.get(Str.add("/v2/keys/", key,
"?wait=true&recursive=true&index=", Long.toString(index)),
getResponseHandler("get", key, responseBlockingQueue)); httpClientRequest.
end(); return getResponseWaitForever(key, responseBlockingQueue); }
@Override public void waitRecursive(org.boon.core.Handler<Response>
responseHandler, String key, long index) { HttpClientRequest
httpClientRequest = httpClient.get(Str.add("/v2/keys/", key,
"?wait=true&recursive=true&index=", Long.toString(index)), handleResponse(
"get", key, responseHandler)); httpClientRequest.end(); } private Response
getResponse(String key, BlockingQueue<Response> responseBlockingQueue) { try
{ Response response = responseBlockingQueue.poll(timeout, TimeUnit.
MILLISECONDS); if (response == null) { if (this.closed) { throw new
ConnectionException(Str.add("Connection exception for key ", key, " host ",
host, " port ", ""+port)); } throw new TimeoutException(Str.add("Response
timeout for get request key=", key)); } return response; } catch (
InterruptedException e) { Thread.interrupted(); } return null; } private
Response getResponseWaitForever(String key, BlockingQueue<Response>
responseBlockingQueue) { try { Response response = responseBlockingQueue.
take(); if (response == null) { die("Response timeout for get request key=",
key); } return response; } catch (InterruptedException e) { Thread.
interrupted(); } return null; } private Handler<HttpClientResponse>
getResponseHandler(final String action, final String key, final
BlockingQueue<Response> responseBlockingQueue) { return new Handler<
HttpClientResponse>() { @Override public void handle(final
HttpClientResponse httpClientResponse) { final Buffer buffer = new Buffer(
1000); httpClientResponse.dataHandler(new Handler<Buffer>() { @Override
public void handle(Buffer partialBuf) { buffer.appendBuffer(partialBuf); }
}).endHandler(new Handler<Void>() { @Override public void handle(Void aVoid)
{ String json = buffer.toString(); Response response = parseResponse(json,
action, key, httpClientResponse); responseBlockingQueue.offer(response); }
}).exceptionHandler(new Handler<Throwable>() { @Override public void handle(
Throwable event) { Response response = createResponseFromException(action,
key, event); responseBlockingQueue.offer(response); } }); } }; } private
Response createResponseFromException(String action, String key, Throwable
throwable) { if (throwable instanceof ConnectException) { closed = true;
Error error = new Error(-1, throwable.getClass().getName(), Str.add("Unable
to connect to host ", this.host, " port ", ""+this.port), 0L); return new
Response(action, -1, error); } Error error = new Error(-1, throwable.
getClass().getName(), Str.add(throwable.getMessage(), " action ", action, "
key ", key, " host ", this.host, " port ", ""+this.port), 0L); return new
Response(action, -1, error); } private Handler<HttpClientResponse>
handleResponse(final String action, final String key, final org.boon.core.
Handler<Response> handler) { return new Handler<HttpClientResponse>() {
@Override public void handle(final HttpClientResponse httpClientResponse) {
final Buffer buffer = new Buffer(1000); httpClientResponse.dataHandler(new
Handler<Buffer>() { @Override public void handle(Buffer partialBuf) { buffer
.appendBuffer(partialBuf); } }).endHandler(new Handler<Void>() { @Override
public void handle(Void aVoid) { String json = buffer.toString(); Response
response = parseResponse(json, action, key, httpClientResponse); handler.
handle(response); } }).exceptionHandler(new Handler<Throwable>() { @Override
public void handle(Throwable event) { Response response =
createResponseFromException(action, key, event); handler.handle(response); }
}); } }; } private Response parseResponse(String json, String action, String
key, HttpClientResponse httpClientResponse) { try { Response response;
switch (httpClientResponse.statusCode()) { case 307: response = new
RedirectResponse(httpClientResponse.headers().get("Location")); return
response; case 200: response = jsonParserAndMapperThreadLocal.get().parse(
Response.class, json); response.setHttpStatusCode(httpClientResponse.
statusCode()); return response; case 201: response =
jsonParserAndMapperThreadLocal.get().parse(Response.class, json); response.
setHttpStatusCode(httpClientResponse.statusCode()); response.setCreated();
return response; case 404: Error notFound = jsonParserAndMapperThreadLocal.
get().parse(Error.class, json); response = new Response(action,
httpClientResponse.statusCode(), notFound); return response; default: if (!
isEmpty(json) && (json.contains("cause") || json.contains("errorCode") ) ) {
Error error = jsonParserAndMapperThreadLocal.get().parse(Error.class, json);
response = new Response(action, httpClientResponse.statusCode(), error);
return response; } else if (!isEmpty(json)){ response =
jsonParserAndMapperThreadLocal.get().parse(Response.class, json); response.
setHttpStatusCode(httpClientResponse.statusCode()); return response; } else
{ puts(httpClientResponse.statusCode(), httpClientResponse.headers().entries
()); return null; } } } catch (Exception ex) { if (!Str.isEmpty(json)) {
return createResponseFromException(action + "\n" + json + "\n", key, ex); }
else { return createResponseFromException(action + " blank response", key,
ex); } } } private void connect() { httpClient = vertx.createHttpClient().
setHost(host).setPort(port).setConnectTimeout(timeout).setMaxPoolSize(20).
exceptionHandler(new Handler<Throwable>() { @Override public void handle(
Throwable throwable) { if (throwable instanceof ConnectException) { closed =
true; } else { puts(throwable); throwable.printStackTrace(); } } }); }
public boolean isClosed() { return closed; }}


Rick Hight

unread,
Oct 9, 2014, 11:49:19 AM10/9/14
to ve...@googlegroups.com
About the etcd Java client. So I figured I would make it recover from errors well, and even created a exception hierarchy (which I am typically loathe to do), and during this I noticed some odd behavior that seems a bit undocumented. When you kill an etcd node, and then restart it, it is no longer the leader. Any set operation gets HTTP 307 (Forwarded) to the new leader. This is all well and good if you are using curl with -L which will follow said 307 bunny trail, or even if you are using Boon HTTP utils, which uses java.net.URL, but if you are using vertx (I am), it does not follow the link automagically, but then I figured, you might actually want to know when you get 307'ed to a new server and the leader changes so I created a new response called RedirectResponse....


        EtcdClient client = new EtcdClient("localhost", 4001);
        client.get(handler, "foo");


        client.set(new Handler<Response>() {
            @Override
            public void handle(Response event) {
                if (event instanceof RedirectResponse) {

                    URI location = ((RedirectResponse) event).location();
                    EtcdClient client = new EtcdClient(location.getHost(), location.getPort());
                    client.set(handler, "foo", "Rick found the other server");
                } else {
                    handler.handle(event);
                }
            }
        }, "foo", "Rick Was here");

Now of course I realized, I really need a command object instead of hand coding each request/type of operation so I can create a client (I thought there would be like six and mid-way through the implementation, I realized my fallacy) facade on top of existing client I wrote that allows you to configure let's say the first three nodes of a etcd cluster. It queries the admin to get all of the machines in the cluster (so you have a very large fallback), then gets (reads) auto fallback round robin to the nodes (or alway round robin to spread load or not), and then it handles ForwardedResponse automatically, and keeps track of the current leader for writes. Devil in the details. 


On Thursday, October 9, 2014 1:18:04 AM UTC-7, Rick Hight wrote:
I wrote a little HTTP client for etcd using vertx.

If you wanted an example of how to use vertx http client, it might not be a bad one. 


Boon etcd is a Java client for etcd.

What is etcd?

etcd is a highly-available key value store for shared configuration and service discovery.

etcd is part of the coreos project.

etcd is inspired by Apache ZooKeeper and doozer, with a focus on being:

Simple: REST like and curl'able user facing API (HTTP+JSON); Secure: optional SSL client cert authentication; Fast: benchmarked 1000s of writes/s per instance; Reliable: properly distributed using Raft.

etcd is written in Go and uses the Raft consensus algorithm to manage a highly-available replicated log.

You can learn more about etcd at https://github.com/coreos/etcd.

Boon etcd client for Java

    <span class="n" s
...

Jordan Halterman

unread,
Oct 9, 2014, 1:02:57 PM10/9/14
to ve...@googlegroups.com
Not to be an A-hole or anything, i know this is a Vert.x forum and not a Raft/consensus forum, but I just have to clarify that Raft is not a "high availability" algorithm in distributed systems terms. It falls on the CP area of the CAP theorem and will thus sacrifice availability for consistency in the face of a partition, making it perfect for things like service discovery and configuration management as you said. That is, it can tolerate a minority of the cluster being down. Nevertheless, awesome :-)

Source: http://github.com/kuujo/copycat

> On Oct 9, 2014, at 1:18 AM, Rick Hight <richardh...@gmail.com> wrote:
>
> I wrote a little HTTP client for etcd using vertx.
>
> If you wanted an example of how to use vertx http client, it might not be a bad one.
>
> https://github.com/boonproject/boon/blob/master/etcd/README.md
>
> Boon etcd is a Java client for etcd.
>
> What is etcd?
>
> etcd is a highly-available key value store for shared configuration and service discovery.
>
> etcd is part of the coreos project.
>
> etcd is inspired by Apache ZooKeeper and doozer, with a focus on being:
>
> Simple: REST like and curl'able user facing API (HTTP+JSON); Secure: optional SSL client cert authentication; Fast: benchmarked 1000s of writes/s per instance; Reliable: properly distributed using Raft.
>
> etcd is written in Go and uses the Raft consensus algorithm to manage a highly-available replicated log.
>
> You can learn more about etcd at https://github.com/coreos/etcd.
>
> Boon etcd client for Java
>
> this.timeout = milliSecondTimeout;
> connect();
>
> }
>
>
>
> public EtcdClient(String host, int port) {
> this(null, host, port);
> }
>
> @Override
> public Response delete(String key) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key),
> getResponseHandler("delete", key, responseBlockingQueue));
>
> httpClientRequest.end();
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void delete(org.boon.core.Handler<Response> responseHandler, String key) {
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key),
> handleResponse("delete", key, responseHandler));
>
> httpClientRequest.end();
>
>
> }
>
> @Override
> public Response deleteDir(String key) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key, "?dir=true"),
> getResponseHandler("delete", key, responseBlockingQueue));
>
> httpClientRequest.end();
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void deleteDir(org.boon.core.Handler<Response> responseHandler, String name) {
>
> }
>
> @Override
> public Response deleteDirRecursively(String key) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key, "?dir=true&recursive=true"),
> getResponseHandler("delete", key, responseBlockingQueue));
>
> httpClientRequest.end();
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void deleteDirRecursively(org.boon.core.Handler<Response> responseHandler, String key) {
>
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key, "?dir=true&recursive=true"),
> handleResponse("delete", key, responseHandler));
>
> httpClientRequest.end();
>
> }
>
> @Override
> public Response deleteIfAtIndex(String key, long index) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key), getResponseHandler("delete", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(20);
> addField(buffer, "prevIndex", index);
> httpClientRequest.end(buffer);
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void deleteIfAtIndex(org.boon.core.Handler<Response> responseHandler, String key, long index) {
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key),
> handleResponse("delete", key, responseHandler));
>
> Buffer buffer = new Buffer(20);
> addField(buffer, "prevIndex", index);
> httpClientRequest.end(buffer);
>
>
> }
>
> @Override
> public Response deleteIfValue(String key, String prevValue) {
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key),
> getResponseHandler("delete", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(20);
> addField(buffer, "prevValue", prevValue);
> httpClientRequest.end(buffer);
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void deleteIfValue(org.boon.core.Handler<Response> responseHandler, String key, String prevValue) {
>
> HttpClientRequest httpClientRequest = httpClient.delete(Str.add("/v2/keys/", key), handleResponse("delete", key,
> responseHandler));
>
> Buffer buffer = new Buffer(20);
> addField(buffer, "prevValue", prevValue);
> httpClientRequest.end(buffer);
>
>
> }
>
>
> @Override
> public void createDir(org.boon.core.Handler<Response> responseHandler, String key) {
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(key.length());
>
> buffer.appendString("dir=true");
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> }
>
> @Override
> public Response createDir(String key) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(key.length());
>
> buffer.appendString("dir=true");
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public Response createTempDir(String key, long ttl) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(key.length());
>
> buffer.appendString("dir=true&ttl=" + ttl);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void createTempDir(org.boon.core.Handler<Response> responseHandler, String key, long ttl) {
>
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(key.length());
>
> buffer.appendString("dir=true&ttl=" + ttl);
>
> sendToEtcd(httpClientRequest, buffer);
>
> }
>
> @Override
> public Response updateDirTTL(String key, long ttl) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(key.length());
>
> buffer.appendString("prevExist=true&dir=true&ttl=" + ttl);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void listSorted(org.boon.core.Handler<Response> responseHandler, String key) {
>
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key, "?recursive=true&sorted=true"),
>
> handleResponse("get", key, responseHandler));
>
> httpClientRequest.end();
>
>
>
> }
>
> @Override
> public Response addToDir(String dirName, String key, String value) {
>
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> String uri = Str.add("/v2/keys/", dirName, "/", key);
> HttpClientRequest httpClientRequest = httpClient.post(uri, getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void addToDir(org.boon.core.Handler<Response> responseHandler, String dirName, String key, String value) {
>
> String uri = Str.add("/v2/keys/", dirName, "/", key);
> HttpClientRequest httpClientRequest = httpClient.post(uri, handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> sendToEtcd(httpClientRequest, buffer);
> }
>
> public Response set(String key, String value) {
>
>
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void set(org.boon.core.Handler<Response> responseHandler, String key, String value) {
>
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> sendToEtcd(httpClientRequest, buffer);
>
> }
>
> @Override
> public Response setConfigFile(String key, String fileName) {
> if (!IO.exists(fileName)) {
> die("setConfigFile", "file name does not exist", fileName);
> }
> return this.set(key, IO.read(fileName));
> }
>
> @Override
> public void setConfigFile(org.boon.core.Handler<Response> responseHandler, String key, String fileName) {
> if (!IO.exists(fileName)) {
> die("setConfigFile", "file name does not exist", fileName);
> }
> this.set(responseHandler, key, IO.read(fileName));
> }
>
> @Override
> public Response setIfExists(String key, String value) {
>
>
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(value.length() + 50);
>
> buffer.appendString("prevExist=true");
> addField(buffer, "&value", value);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void setIfExists(org.boon.core.Handler<Response> responseHandler, String key, String value) {
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(value.length() + 50);
>
> buffer.appendString("prevExist=true");
> addField(buffer, "&value", value);
>
> sendToEtcd(httpClientRequest, buffer);
>
> }
>
> @Override
> public Response setIfNotExists(String key, String value) {
>
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(value.length() + 50);
>
> buffer.appendString("prevExist=false");
> addField(buffer, "&value", value);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
>
> }
>
> @Override
> public void setIfNotExists(org.boon.core.Handler<Response> responseHandler, String key, String value) {
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(value.length() + 50);
>
> buffer.appendString("prevExist=false");
> addField(buffer, "&value", value);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> }
>
> @Override
> public Response compareAndSwapByValue(String key, String prevValue, String value) {
>
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(50 + prevValue.length() + value.length());
>
> addField(buffer, "prevValue", prevValue);
>
> buffer.appendString("&value", value);
>
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void compareAndSwapByValue(org.boon.core.Handler<Response> responseHandler, String key, String prevValue, String value) {
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(50 + prevValue.length() + value.length());
>
> addField(buffer, "prevValue", prevValue);
>
> buffer.appendString("&value", value);
>
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> }
>
> @Override
> public Response compareAndSwapByModifiedIndex(String key, long prevIndex, String value) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(20 + value.length());
>
> addField(buffer, "prevIndex", prevIndex);
>
> buffer.appendString("&value", value);
>
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void compareAndSwapByModifiedIndex(org.boon.core.Handler<Response> responseHandler, String key, long prevIndex, String value) {
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(20 + value.length());
>
> addField(buffer, "prevIndex", prevIndex);
>
> buffer.appendString("&value", value);
>
>
> sendToEtcd(httpClientRequest, buffer);
> }
>
>
> private void addField(Buffer buffer, String fieldName, String value) {
>
> try {
> buffer.appendString(fieldName).appendString("=").appendString(URLEncoder.encode(value, StandardCharsets.UTF_8.displayName()));
>
> } catch (UnsupportedEncodingException e) {
> Exceptions.handle(e);
> }
> }
>
>
> private void addField(Buffer buffer, String fieldName, long value) {
> buffer.appendString(fieldName).appendString("=").appendString(Long.toString(value));
>
> }
>
>
> private void addField(Buffer buffer, String fieldName, boolean value) {
> buffer.appendString(fieldName).appendString("=").appendString(Boolean.toString(value));
>
> }
>
> @Override
> public Response setTemp(String key, String value, int ttl) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> buffer.appendString("&ttl=").appendString(Integer.toString(ttl));
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void setTemp(org.boon.core.Handler<Response> responseHandler, String key, String value, int ttl) {
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> buffer.appendString("&ttl=").appendString(Integer.toString(ttl));
>
> sendToEtcd(httpClientRequest, buffer);
>
> }
>
> @Override
> public Response removeTTL(String key, String value) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key), getResponseHandler("set", key, responseBlockingQueue));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> buffer.appendString("&ttl=&");
>
> addField(buffer, "prevExist", true);
>
> sendToEtcd(httpClientRequest, buffer);
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void removeTTL(org.boon.core.Handler<Response> responseHandler, String key, String value) {
>
>
> HttpClientRequest httpClientRequest = httpClient.put(Str.add("/v2/keys/", key),
> handleResponse("set", key, responseHandler));
>
> Buffer buffer = new Buffer(value.length());
>
> addField(buffer, "value", value);
>
> buffer.appendString("&ttl=&");
>
> addField(buffer, "prevExist", true);
>
> sendToEtcd(httpClientRequest, buffer);
>
> }
>
> private void sendToEtcd(HttpClientRequest httpClientRequest, Buffer buffer) {
> httpClientRequest.putHeader("Content-Type", "application/x-www-form-urlencoded").end(buffer);
> }
>
> @Override
> public Response get(String key) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key),
> getResponseHandler("get", key, responseBlockingQueue));
>
> httpClientRequest.end();
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void get(org.boon.core.Handler<Response> responseHandler, String key) {
>
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key),
> handleResponse("get", key, responseHandler));
>
> httpClientRequest.end();
>
>
>
> }
>
> @Override
> public Response getConsistent(String key) {
>
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key, "?consistent=true"),
>
> getResponseHandler("get", key, responseBlockingQueue));
>
> httpClientRequest.end();
>
>
> return getResponse(key, responseBlockingQueue);
> }
>
> @Override
> public void getConsistent(org.boon.core.Handler<Response> responseHandler, String key) {
>
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key, "?consistent=true"),
>
> handleResponse("get", key, responseHandler));
>
> httpClientRequest.end();
>
>
> }
>
> @Override
> public Response wait(String key) {
>
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key, "?wait=true"),
> getResponseHandler("get", key, responseBlockingQueue));
>
> httpClientRequest.end();
>
>
> return getResponseWaitForever(key, responseBlockingQueue);
> }
>
> @Override
> public void wait(org.boon.core.Handler<Response> responseHandler, String key) {
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key, "?wait=true"),
> handleResponse("get", key, responseHandler));
>
> httpClientRequest.end();
>
>
> }
>
> @Override
> public Response wait(String key, long index) {
>
> final BlockingQueue<Response> responseBlockingQueue = new ArrayBlockingQueue<>(1);
>
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key, "?wait=true&waitIndex=", Long.toString(index)),
> public void waitRecursive(org.boon.core.Handler<Response> responseHandler, String key, long index) {
>
> HttpClientRequest httpClientRequest = httpClient.get(Str.add("/v2/keys/", key,
> "?wait=true&recursive=true&index=", Long.toString(index)),
> handleResponse("get", key, responseHandler));
>
> httpClientRequest.end();
>
> }
>
> private Response getResponse(String key, BlockingQueue<Response> responseBlockingQueue) {
> try {
> Response response = responseBlockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
> if (response == null) {
> if (this.closed) {
> throw new ConnectionException(Str.add("Connection exception for key ", key, " host ", host, " port ", ""+port));
> }
> throw new TimeoutException(Str.add("Response timeout for get request key=", key));
> }
>
> return response;
> } catch (InterruptedException e) {
> Thread.interrupted();
> }
>
> return null;
> }
>
> private Response getResponseWaitForever(String key, BlockingQueue<Response> responseBlockingQueue) {
> try {
> Response response = responseBlockingQueue.take();
> if (response == null) {
> die("Response timeout for get request key=", key);
> }
>
> return response;
> } catch (InterruptedException e) {
> Thread.interrupted();
> }
>
> return null;
> }
>
> private Handler<HttpClientResponse> getResponseHandler(final String action, final String key, final BlockingQueue<Response> responseBlockingQueue) {
> return new Handler<HttpClientResponse>() {
> @Override
> public void handle(final HttpClientResponse httpClientResponse) {
>
> final Buffer buffer = new Buffer(1000);
>
>
> httpClientResponse.dataHandler(new Handler<Buffer>() {
> @Override
> public void handle(Buffer partialBuf) {
>
> buffer.appendBuffer(partialBuf);
> }
> }).endHandler(new Handler<Void>() {
> @Override
> public void handle(Void aVoid) {
>
> String json = buffer.toString();
>
> Response response = parseResponse(json, action, key, httpClientResponse);
> responseBlockingQueue.offer(response);
>
> }
> }).exceptionHandler(new Handler<Throwable>() {
> @Override
> public void handle(Throwable event) {
>
> Response response = createResponseFromException(action, key, event);
> responseBlockingQueue.offer(response);
> }
> });
> }
> };
> }
>
> private Response createResponseFromException(String action, String key, Throwable throwable) {
>
> if (throwable instanceof ConnectException) {
> closed = true;
>
> Error error = new Error(-1, throwable.getClass().getName(), Str.add("Unable to connect to host ", this.host, " port ", ""+this.port), 0L);
> return new Response(action, -1, error);
>
>
> }
> Error error = new Error(-1, throwable.getClass().getName(), Str.add(throwable.getMessage(),
> " action ", action, " key ", key, " host ", this.host, " port ", ""+this.port), 0L);
> return new Response(action, -1, error);
> }
>
> private Handler<HttpClientResponse> handleResponse(final String action, final String key, final org.boon.core.Handler<Response> handler) {
> return new Handler<HttpClientResponse>() {
> @Override
> public void handle(final HttpClientResponse httpClientResponse) {
>
> final Buffer buffer = new Buffer(1000);
>
> httpClientResponse.dataHandler(new Handler<Buffer>() {
> @Override
> public void handle(Buffer partialBuf) {
>
> buffer.appendBuffer(partialBuf);
> }
> }).endHandler(new Handler<Void>() {
> @Override
> public void handle(Void aVoid) {
> String json = buffer.toString();
> Response response = parseResponse(json, action, key, httpClientResponse);
> handler.handle(response);
> }
> }).exceptionHandler(new Handler<Throwable>() {
> @Override
> public void handle(Throwable event) {
>
> Response response = createResponseFromException(action, key, event);
> handler.handle(response);
> }
> });
> }
> };
> }
>
> private Response parseResponse(String json, String action, String key, HttpClientResponse httpClientResponse) {
> try {
>
>
> Response response;
>
>
> switch (httpClientResponse.statusCode()) {
>
> case 307:
> response = new RedirectResponse(httpClientResponse.headers().get("Location"));
> return response;
>
> case 200:
> response = jsonParserAndMapperThreadLocal.get().parse(Response.class, json);
> response.setHttpStatusCode(httpClientResponse.statusCode());
> return response;
>
> case 201:
> response = jsonParserAndMapperThreadLocal.get().parse(Response.class, json);
> response.setHttpStatusCode(httpClientResponse.statusCode());
> response.setCreated();
> return response;
>
> case 404:
>
> Error notFound = jsonParserAndMapperThreadLocal.get().parse(Error.class, json);
>
> response = new Response(action, httpClientResponse.statusCode(), notFound);
> return response;
>
> default:
>
> if (!isEmpty(json) && (json.contains("cause") || json.contains("errorCode") ) ) {
>
> Error error = jsonParserAndMapperThreadLocal.get().parse(Error.class, json);
>
> response = new Response(action, httpClientResponse.statusCode(), error);
> return response;
> } else if (!isEmpty(json)){
>
> response = jsonParserAndMapperThreadLocal.get().parse(Response.class, json);
> response.setHttpStatusCode(httpClientResponse.statusCode());
> return response;
> } else {
> puts(httpClientResponse.statusCode(), httpClientResponse.headers().entries());
> return null;
> }
>
> }
>
> } catch (Exception ex) {
>
> if (!Str.isEmpty(json)) {
> return createResponseFromException(action + "\n" + json + "\n", key, ex);
> } else {
>
> return createResponseFromException(action + " blank response", key, ex);
> }
> }
> }
>
> private void connect() {
> httpClient = vertx.createHttpClient().setHost(host).setPort(port).setConnectTimeout(timeout).setMaxPoolSize(20).exceptionHandler(new Handler<Throwable>() {
> @Override
> public void handle(Throwable throwable) {
>
> if (throwable instanceof ConnectException) {
> closed = true;
> } else {
> puts(throwable);
> throwable.printStackTrace();
> }
> }
> });
>
> }
>
> public boolean isClosed() {
> return closed;
> }
> }
>
>
> --
> You received this message because you are subscribed to the Google Groups "vert.x" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Rick Hightower

unread,
Oct 9, 2014, 2:53:22 PM10/9/14
to ve...@googlegroups.com
No worries. I believe I lifted that right from their website so take it up with them. I should have quoted it. It was late. There was vodka. 

--
You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/hS7ZIB8jvFQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+un...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Rick Hightower
(415) 968-9037
Profile 

Jordan Halterman

unread,
Oct 9, 2014, 3:16:56 PM10/9/14
to ve...@googlegroups.com
Hahaha

Sent from my iPhone
Reply all
Reply to author
Forward
0 new messages