there are already some post around server push and GWT realized with
Jetty+Continuations.
Greg Wilkins describes in http://docs.codehaus.org/display/JETTY/GWT
how to realize the server side.
I implement a GWT client which uses a technique similar to the one
proposed in
http://groups.google.com/group/Google-Web-Toolkit/browse_thread/thread/5233cf1fc6f2a1a7/3c6aba6a3ae9ec81?lnk=gst&q=server+push+GWT+stack&rnum=1#3c6aba6a3ae9ec81
And it works well.
Due to restrictions in my project I am now probably restricted to use
Tomcat.
Tomcat 6.0 has the CometProcessor and connectors which use NIO.
Has anybody experience with the server side implementation using
Tomcat and perhaps the CometProcessor?
I am still searching for a solution to couple the RemoteServiceServlet
of GWT with the CometProcessor.
Any information or further questions regarding this topic are welcome.
Best regards
Achim
as you are the developer of the rocket-gwt library (right?) I may
discuss some points with you and try to explain why the rocket-comet
solution is not an option for us:
The CometServerServlet uses basic IO, not NIO. Every request therefore
occupies a thread on the server which does not make it scalable for a
lot of concurrent requests in my opinion.
What are your experiences with rocket-gwt-applications and tomcat? How
many concurrent users are possible?
It is not a solution for long running rpc calls since it does not
allow real rpc calls.
We want to make a rpc with varying arguments which can take long on
the server.
Best regards
Achim
It therefore seems naturally to me to introduce asynchronous server
calls: calls are invoked on the server and the server gets informed
with an AsyncCallback that the result for the call is available and
then the server writes the response to the client.
Suppose the following service which is to be made asynchronous on the
server side:
public interface EchoService
extends RemoteService
{
public String echoString(String text);
}
This service has an asynchronous version:
public interface EchoServiceAsync
extends AsyncRemoteService
{
public void echoString(String text,AsyncCallback callback);
}
The client is implemented as before.
But the server side service implementation uses a new
AsyncRemoteServiceServlet which is based on the RemoteServiceServlet
(I used gwt-1.3.3 by the way).
The AsyncRemoteServiceServlet extends HttpServlet and implements
CometProcessor.
The EchoServlet now extends AsyncRemoteServiceServlet and extends
EchoServiceAsync, not EchoService.
The echoString method may now put the callback object into a queue
which may be used by another thread which provides the result some
later time.
The AsyncRemoteServiceServlet has to handle the CometProcessor events
BEGIN, READ, END and ERROR.
On BEGIN an AsyncCall object is created which is to collect the
request payload until the request payload is completely available. The
AsyncCall object is stored in a Map with the CometEvent object as a
key.
On READ the AsyncCall object is retrieved from the map and the bytes
available to be read are added to the request payload in the AsyncCall
object.
I had some problems to detect the end of the request payload since the
content-length (which is available on BEGIN is only a hint about the
real length, right?). I am now detecting the end of the payload by
examining the last 20 bytes.
When the end is detected the payload is given to a modified version of
the processCall method:
the method deserializes the service interface name, the method name
and the arguments from the payload, gets the async interface by
reflection, creates a special implementation of the AsyncCallback
object and then invokes the method of the async interface.
When the onSuccess method of the callback object is called the result
is serialized and written to the response stream using the same
methods as in RemoteServiceServlet. Afterwards the CometEvent is
closed.
I got it working so far and I hope it is scalable.
My implementation is still missing
- handling of exceptions
- handling of timeouts
Ready for comments
Achim
I have not tried either the tomcat or jetty nio extensions.
The new AsyncRemoteServiceServlet is copied from the
RemoteServiceServlet of GWT 1.3.3. I will only paste the parts which
changed:
public class AsyncRemoteServiceServlet
extends HttpServlet
implements CometProcessor
{
...
/**
* Find the invoked method on either the specified interface or any
super.
*/
private static Method findInterfaceMethod(
Class intf,
String methodName,
Class[] paramTypes,
boolean includeInherited,
boolean findAsyncMethod)
{
Class[] enhParamTypes = null;
if (findAsyncMethod) {
/* enhance paramTypes with additional type AsyncCallback */
enhParamTypes = new Class[paramTypes.length+1];
for (int i=0;i<paramTypes.length;i++) {
enhParamTypes[i] = paramTypes[i];
}
enhParamTypes[paramTypes.length] =
AsyncCallback.class;
} else {
enhParamTypes = paramTypes;
}
try {
return intf.getDeclaredMethod(methodName, enhParamTypes);
} catch (NoSuchMethodException e) {
if (includeInherited) {
Class[] superintfs = intf.getInterfaces();
for (int i = 0; i < superintfs.length; i++) {
Method method = findInterfaceMethod(superintfs[i],
methodName,
enhParamTypes, true,findAsyncMethod);
if (method != null) {
return method;
}
}
}
return null;
}
}
...
private final Map asyncCalls = new WeakHashMap();
...
public String processCall(AsyncCall asyncCall)
throws SerializationException
{
final String METHOD = "processCall: ";
String payload = asyncCall.getRequestPayload().toString();
// Let subclasses see the serialized request.
//
onBeforeRequestDeserialized(payload);
// Create a stream to deserialize the request.
//
ServerSerializationStreamReader streamReader =
new ServerSerializationStreamReader(
serializableTypeOracle);
streamReader.prepareToRead(payload);
// Read the service interface
//
String serviceIntfName = streamReader.readString();
log(METHOD+"serviceIntfName="+serviceIntfName);
/* but we need the async version of the service */
String asyncServiceIntfName = serviceIntfName+"Async";
// TODO(mmendez): need a way to check the type signature of the
service intf
// Verify that this very servlet implements the specified
interface name.
//
if (!
isImplementedAsyncRemoteServiceInterface(asyncServiceIntfName)) {
// Bad payload, possible hack attempt.
//
throw new SecurityException(
"Blocked attempt to access interface '"
+ asyncServiceIntfName
+ "', which is either not implemented by this servlet or
which doesn't extend AsyncRemoteService; this is either
misconfiguration or a hack attempt");
}
// Actually get the service interface, so that we can query its
methods.
//
Class asyncServiceIntf;
try {
asyncServiceIntf = getClassFromName(asyncServiceIntfName);
} catch (ClassNotFoundException e) {
throw new SerializationException("Unknown service interface
class '"
+ asyncServiceIntfName + "'", e);
}
/*
* the synchronous version of the interface is needed to
* determine the return type of the result which
* is needed for the serialization of the response payload
*/
Class serviceIntf;
try {
serviceIntf = getClassFromName(serviceIntfName);
} catch (ClassNotFoundException e) {
throw new SerializationException("Unknown service interface
class '"
+ serviceIntfName + "'", e);
}
// Read the method name.
//
String methodName = streamReader.readString();
log(METHOD+"methodName="+methodName);
// Read the number and names of the parameter classes from the
stream.
// We have to do this so that we can find the correct overload of
the
// method.
//
int paramCount = streamReader.readInt();
Class[] paramTypes = new Class[paramCount];
for (int i = 0; i < paramTypes.length; i++) {
String paramClassName = streamReader.readString();
try {
paramTypes[i] = getClassOrPrimitiveFromName(paramClassName);
} catch (ClassNotFoundException e) {
throw new SerializationException("Unknown parameter " + i + "
type '"
+ paramClassName + "'", e);
}
}
// For security, make sure the method is found in the service
interface
// and not just one that happens to be defined on this class.
//
Method asyncServiceIntfMethod = findInterfaceMethod(
asyncServiceIntf,
methodName,
paramTypes,
true,
true);
log(METHOD+"asyncServiceIntfMethod="+asyncServiceIntfMethod);
// If it wasn't found, don't continue.
//
if (asyncServiceIntfMethod == null) {
// Bad payload, possible hack attempt.
//
throw new SecurityException(
"Method '"
+ methodName
+ "' (or a particular overload) on interface '"
+ asyncServiceIntfName
+ "' was not found, this is either misconfiguration or a
hack attempt");
}
/* find the synchronous interface method */
Method serviceIntfMethod =
findInterfaceMethod(
serviceIntf,
methodName,
paramTypes,
true,
false);
if (serviceIntfMethod == null) {
// Bad payload, possible hack attempt.
//
throw new SecurityException(
"Method '"
+ methodName
+ "' (or a particular overload) on interface '"
+ serviceIntfName
+ "' was not found, this is either misconfiguration or
a hack attempt");
}
Class returnType = serviceIntfMethod.getReturnType();
log(METHOD+"returnType="+returnType);
// Deserialize the parameters.
Object[] args = new Object[paramCount+1];
for (int i = 0; i < args.length-1; i++) {
args[i] = streamReader.deserializeValue(paramTypes[i]);
}
/* create a new AsyncCallback */
AsyncCallback callback = createAsyncCallback(
asyncCall.getEvent(),
returnType);
args[paramCount] = callback;
// Make the call via reflection.
//
String responsePayload = GENERIC_FAILURE_MSG;
ServerSerializationStreamWriter streamWriter =
new ServerSerializationStreamWriter(
serializableTypeOracle);
Throwable caught = null;
try {
/* the async method has return type void */
asyncServiceIntfMethod.invoke(this, args);
// TODO stop handling of event here
// responsePayload = createResponse(streamWriter, returnType,
returnVal,
// false);
} catch (IllegalArgumentException e) {
caught = e;
} catch (IllegalAccessException e) {
caught = e;
} catch (InvocationTargetException e) {
// Try to serialize the caught exception if the client is
expecting it,
// otherwise log the exception server-side.
caught = e;
Throwable cause = e.getCause();
if (cause != null) {
// Update the caught exception to the underlying cause
caught = cause;
// Serialize the exception back to the client if it's a
declared
// exception
if (isExpectedException(serviceIntfMethod, cause)) {
Class thrownClass = cause.getClass();
responsePayload = createResponse(streamWriter, thrownClass,
cause,
true);
// Don't log the exception on the server
caught = null;
}
}
}
if (caught != null) {
responsePayload = GENERIC_FAILURE_MSG;
ServletContext servletContext = getServletContext();
// servletContext may be null (for example, when unit testing)
if (servletContext != null) {
// Log the exception server side
servletContext.log("Exception while dispatching incoming RPC
call",
caught);
}
}
// Let subclasses see the serialized response.
//
// onAfterResponseSerialized(responsePayload);
return responsePayload;
}
...
private boolean isImplementedAsyncRemoteServiceInterface(String
intfName) {
synchronized (knownImplementedInterfaces) {
// See if it's cached.
//
if (knownImplementedInterfaces.contains(intfName)) {
return true;
}
Class cls = getClass();
// Unknown, so walk up the class hierarchy to find the first
class that
// implements the requested interface
//
while ((cls != null) && !
AsyncRemoteServiceServlet.class.equals(cls)) {
Class[] intfs = cls.getInterfaces();
for (int i = 0; i < intfs.length; i++) {
Class intf = intfs[i];
if (isImplementedRemoteServiceInterfaceRecursive(intfName,
intf)) {
knownImplementedInterfaces.add(intfName);
return true;
}
}
// did not find the interface in this class so we look in the
// superclass
cls = cls.getSuperclass();
}
return false;
}
}
...
private boolean isImplementedRemoteServiceInterfaceRecursive(String
intfName,
Class intfToCheck) {
assert (intfToCheck.isInterface());
if (intfToCheck.getName().equals(intfName)) {
// The name is right, but we also verify that it is assignable
to
// RemoteService.
//
if (AsyncRemoteService.class.isAssignableFrom(intfToCheck)) {
return true;
} else {
return false;
}
}
Class[] intfs = intfToCheck.getInterfaces();
for (int i = 0; i < intfs.length; i++) {
Class intf = intfs[i];
if (isImplementedRemoteServiceInterfaceRecursive(intfName,
intf)) {
return true;
}
}
return false;
}
...
private AsyncCallback createAsyncCallback(
CometEvent event,
Class returnType)
{
AsyncCallbackImpl result = new AsyncCallbackImpl();
result.setRequest(event.getHttpServletRequest());
result.setContext(getServletContext());
result.setEvent(event);
result.setServlet(this);
result.setReturnType(returnType);
return result;
}
public void event(CometEvent event)
throws IOException,ServletException
{
if (event.getEventType()==CometEvent.EventType.BEGIN) {
handleBeginEvent(event);
} else if (event.getEventType()==CometEvent.EventType.READ) {
handleReadEvent(event);
} else if (event.getEventType()==CometEvent.EventType.END) {
handleEndEvent(event);
} else if (event.getEventType()==CometEvent.EventType.ERROR) {
handleErrorEvent(event);
} else {
throw new ServletException("unknown CometEvent");
}
}
private void handleBeginEvent(CometEvent event)
throws ServletException
{
final String METHOD = "handleBeginEvent: ";
log(METHOD+"entered: event="+event);
AsyncCall asyncCall = new AsyncCall(event);
registerAsyncCall(asyncCall);
log(METHOD+"asyncCall="+asyncCall);
log(METHOD+"leaving");
}
private void handleReadEvent(CometEvent event)
throws ServletException, IOException
{
final String METHOD = "handleReadEvent: ";
log(METHOD+"entered: event="+event);
AsyncCall asyncCall = (AsyncCall) asyncCalls.get(event);
if (asyncCall==null) {
throw new ServletException(
"no AsyncCall found for event (implementation error): "+event);
}
if (asyncCall.isRequestPayloadComplete()) {
log(METHOD+"requestPayload already complete");
return;
}
log(METHOD+"asyncCall="+asyncCall);
HttpServletRequest request = event.getHttpServletRequest();
InputStream is = request.getInputStream();
byte[] buf = new byte[512];
int n = 0;
do {
n = is.read(buf); //can throw an IOException
if (n > 0) {
log(METHOD+"read " + n + " bytes");
String chunk = new String(buf, 0, n,"UTF-8");
asyncCall.appendToRequestPayload(chunk);
log(METHOD+"asyncCall="+asyncCall);
} else if (n < 0) {
log(METHOD+"input stream is closed");
return;
}
} while ((is.available() > 0) || (!isEndDetected(buf,n)));
if (isEndDetected(buf, n)) {
asyncCall.setRequestPayloadComplete(true);
try {
handleCompleteRequestPayload(asyncCall);
} catch (SerializationException e) {
finishWithError(event.getHttpServletResponse(), e);
}
} else {
log(METHOD+"requestPayload is not complete yet");
}
log(METHOD+"leaving");
}
private boolean isEndDetected(byte[] chunk,int n)
{
// final String METHOD = "isEndDetected: ";
// log(METHOD+"entered");
StringBuilder s = new StringBuilder();
for (int i=n-20;i<n;i++) {
s.append(chunk[i]).append("|");
}
// log(METHOD+"bytes="+s);
if (n<20) {
/* chunk must have at least 20 bytes */
// log(METHOD+"chunk smaller than 20 bytes");
return false;
}
for (int i=n-20;i<n;i+=4) {
if ((chunk[i+2]!=-65) || (chunk[i+3]!=-65)) {
// log(METHOD+"not the end");
return false;
}
}
// log(METHOD+"end detected");
return true;
}
private void handleCompleteRequestPayload(AsyncCall asyncCall)
throws SerializationException
{
final String METHOD = "handleCompleteRequestPayload: ";
log(METHOD+"entered:
requestPayload="+asyncCall.getRequestPayload());
processCall(asyncCall);
removeAsyncCall(asyncCall);
}
private void handleEndEvent(CometEvent event)
{
final String METHOD = "handleEndEvent: ";
log(METHOD+"entered: event="+event);
}
private void handleErrorEvent(CometEvent event)
{
final String METHOD = "handleErrorEvent: ";
log(METHOD+"entered: event="+event);
}
private void registerAsyncCall(AsyncCall asyncCall)
{
asyncCalls.put(asyncCall.getEvent(),asyncCall);
}
private void removeAsyncCall(AsyncCall asyncCall)
{
asyncCalls.remove(asyncCall);
}
...
}
The AsyncCall is a holder for the request payload until it is
complete:
package com.google.gwt.user.server.rpc;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import org.apache.catalina.CometEvent;
public class AsyncCall
{
private int contentLength;
private StringBuilder requestPayload;
private int bytesRead;
private CometEvent event;
private boolean requestPayloadComplete;
public void setRequestPayloadComplete(boolean requestPayloadComplete)
{
this.requestPayloadComplete = requestPayloadComplete;
}
public int getBytesRead() {
return bytesRead;
}
public StringBuilder getRequestPayload() {
return requestPayload;
}
public CometEvent getEvent() {
return event;
}
public AsyncCall(CometEvent event)
throws ServletException
{
this.event = event;
HttpServletRequest request = event.getHttpServletRequest();
contentLength = request.getContentLength();
if (contentLength == -1) {
// Content length must be known.
throw new ServletException("Content-Length must be
specified");
}
String contentType = request.getContentType();
boolean contentTypeIsOkay = false;
// Content-Type must be specified.
if (contentType != null) {
// The type must be plain text.
if (contentType.startsWith("text/plain")) {
// And it must be UTF-8 encoded (or unspecified, in which
case we assume
// that it's either UTF-8 or ASCII).
if (contentType.indexOf("charset=") == -1) {
contentTypeIsOkay = true;
} else if (contentType.indexOf("charset=utf-8") != -1) {
contentTypeIsOkay = true;
}
}
}
if (!contentTypeIsOkay) {
throw new ServletException(
"Content-Type must be 'text/plain' with 'charset=utf-8' (or
unspecified charset)");
}
requestPayload = new StringBuilder();
bytesRead = 0;
}
public boolean isRequestPayloadComplete()
{
return requestPayloadComplete;
}
public void appendToRequestPayload(String chunk)
{
requestPayload.append(chunk);
bytesRead += chunk.length();
}
public String toString()
{
return "AsyncCall[contentLength="+contentLength
+",bytesRead="+bytesRead
+",requestPayload="+requestPayload
+"]";
}
}
The AsyncCallbackImpl is the implementation which is given to the
async service method. onFailure is not implemented yet, onSuccess
sends the response to the client:
package com.google.gwt.user.server.rpc;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import org.apache.catalina.CometEvent;
import com.google.gwt.user.client.rpc.AsyncCallback;
import com.google.gwt.user.rebind.rpc.SerializableTypeOracle;
import
com.google.gwt.user.server.rpc.impl.ServerSerializableTypeOracle;
import
com.google.gwt.user.server.rpc.impl.ServerSerializableTypeOracleImpl;
import
com.google.gwt.user.server.rpc.impl.ServerSerializationStreamWriter;
public class AsyncCallbackImpl
implements AsyncCallback
{
private HttpServletRequest request;
private ServletContext context;
private CometEvent event;
private AsyncRemoteServiceServlet servlet;
private Class returnType;
public Class getReturnType() {
return returnType;
}
public void setReturnType(Class returnType) {
this.returnType = returnType;
}
public AsyncRemoteServiceServlet getServlet() {
return servlet;
}
public void setServlet(AsyncRemoteServiceServlet servlet) {
this.servlet = servlet;
}
public CometEvent getEvent() {
return event;
}
public void setEvent(CometEvent event) {
this.event = event;
}
public ServletContext getContext() {
return context;
}
public void setContext(ServletContext context) {
this.context = context;
}
public HttpServletRequest getRequest() {
return request;
}
public void setRequest(HttpServletRequest request) {
this.request = request;
}
public void onFailure(Throwable caught)
{
final String METHOD = "onFailure: ";
context.log(METHOD+"caught="+caught);
}
public void onSuccess(Object result)
{
final String METHOD = "onSuccess: ";
context.log(METHOD+"result="+result);
try {
ServerSerializationStreamWriter streamWriter =
new ServerSerializationStreamWriter(
servlet.serializableTypeOracle);
String responsePayload =
servlet.createResponse(
streamWriter,
returnType,
result,
false);
context.log(METHOD+"responsePayload="+responsePayload);
servlet.writeResponse(
request,
event.getHttpServletResponse(),
responsePayload);
context.log(METHOD+"closing event: "+event);
event.close();
} catch (IOException e) {
context.log(METHOD+"failed to close event: "+e.getMessage(),e);
}
}
void addListener(AsyncCallbackListener listener)
{
listeners.add(listener);
}
}
An example implementation of the AsyncRemoteServiceServlet is:
package de.freenet.tomcatgwt.service;
import javax.servlet.ServletException;
import com.google.gwt.user.client.rpc.AsyncCallback;
import com.google.gwt.user.server.rpc.AsyncRemoteServiceServlet;
import de.freenet.tomcatgwt.client.EchoServiceAsync;
@SuppressWarnings("serial")
public class EchoServlet
extends AsyncRemoteServiceServlet
implements EchoServiceAsync
{
public void echoString(
String text,
AsyncCallback callback)
{
final String METHOD = "echoString: ";
log(METHOD+"entered");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
callback.onSuccess(text);
log(METHOD+"leaving");
}
}
Best regards
Achim
Regards
Achim