Adding ability to issue close(SERVER_ERROR) on unhandled exception during parse/notify flows

This commit is contained in:
Joakim Erdfelt 2012-06-29 15:12:05 -07:00
parent 3fa89ea577
commit 0a301b153e
7 changed files with 131 additions and 53 deletions

View File

@ -100,8 +100,8 @@ public class EventMethod
}
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e)
{
LOG.warn("Cannot call method {} on {} with {}",method,pojo,args);
LOG.warn(e);
String err = String.format("Cannot call method %s on %s with args: %s",method,pojo,args);
throw new WebSocketException(err,e);
}
}

View File

@ -1,5 +1,6 @@
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
@ -100,51 +101,58 @@ public class WebSocketEventDriver implements Parser.Listener
return;
}
// Specified Text Case
if ((frame instanceof TextFrame) && (events.onText != null))
try
{
TextFrame text = (TextFrame)frame;
events.onText.call(websocket,connection,text.getPayloadUTF8());
return;
}
// Specified Binary Case
if ((frame instanceof BinaryFrame) && (events.onBinary != null))
{
BinaryFrame bin = (BinaryFrame)frame;
if (events.onBinary.isParameterPresent(ByteBuffer.class))
// Specified Text Case
if ((frame instanceof TextFrame) && (events.onText != null))
{
// Byte buffer approach
events.onBinary.call(websocket,connection,bin.getPayload());
}
else
{
// Byte array approach
byte buf[] = BufferUtil.toArray(bin.getPayload());
events.onBinary.call(websocket,connection,buf,0,buf.length);
}
return;
}
// Basic Hierarchy Case
Class<? extends BaseFrame> frameType = frame.getClass();
while (true)
{
EventMethod event = events.getOnFrame(frameType);
if (event != null)
{
event.call(websocket,connection,frame);
TextFrame text = (TextFrame)frame;
events.onText.call(websocket,connection,text.getPayloadUTF8());
return;
}
if (!BaseFrame.class.isAssignableFrom(frameType.getSuperclass()))
// Specified Binary Case
if ((frame instanceof BinaryFrame) && (events.onBinary != null))
{
// not assignable
BinaryFrame bin = (BinaryFrame)frame;
if (events.onBinary.isParameterPresent(ByteBuffer.class))
{
// Byte buffer approach
events.onBinary.call(websocket,connection,bin.getPayload());
}
else
{
// Byte array approach
byte buf[] = BufferUtil.toArray(bin.getPayload());
events.onBinary.call(websocket,connection,buf,0,buf.length);
}
return;
}
frameType = (Class<? extends BaseFrame>)frameType.getSuperclass();
// Basic Hierarchy Case
Class<? extends BaseFrame> frameType = frame.getClass();
while (true)
{
EventMethod event = events.getOnFrame(frameType);
if (event != null)
{
event.call(websocket,connection,frame);
return;
}
if (!BaseFrame.class.isAssignableFrom(frameType.getSuperclass()))
{
// not assignable
return;
}
frameType = (Class<? extends BaseFrame>)frameType.getSuperclass();
}
}
catch (Throwable t)
{
unhandled(t);
}
}
@ -172,4 +180,28 @@ public class WebSocketEventDriver implements Parser.Listener
{
this.connection = conn;
}
private void unhandled(Throwable t)
{
LOG.warn("Unhandled Error (closing connection)",t);
// Unhandled Error, close the connection.
try
{
switch (policy.getBehavior())
{
case SERVER:
connection.close(StatusCode.SERVER_ERROR,t.getClass().getSimpleName());
break;
case CLIENT:
connection.close(StatusCode.POLICY_VIOLATION,t.getClass().getSimpleName());
break;
}
}
catch (IOException e)
{
LOG.debug(e);
}
}
}

View File

@ -75,9 +75,14 @@ public class Parser
{
listener.onFrame(f);
}
catch (WebSocketException e)
{
notifyWebSocketException(e);
}
catch (Throwable t)
{
LOG.warn(t);
notifyWebSocketException(new WebSocketException(t));
}
}
}
@ -116,6 +121,8 @@ public class Parser
throw new WebSocketException("Unknown opcode: " + opc);
}
LOG.debug("OpCode {}, fin={}",opcode.name(),fin);
if (opcode.isControlFrame() && !fin)
{
throw new WebSocketException("Fragmented Control Frame [" + opcode.name() + "]");

View File

@ -68,13 +68,13 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
@Override
public void close() throws IOException
{
write(new CloseFrame(StatusCode.NORMAL));
terminateConnection(StatusCode.NORMAL,null);
}
@Override
public void close(int statusCode, String reason) throws IOException
{
write(new CloseFrame(statusCode,reason));
terminateConnection(statusCode,reason);
}
private int fill(AsyncEndPoint endPoint, ByteBuffer buffer)
@ -207,16 +207,18 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
* @param statusCode
* the WebSocket status code.
* @param reason
* the (optiona) reason string. (null is allowed)
* the (optional) reason string. (null is allowed)
* @see StatusCode
*/
private void terminateConnection(short statusCode, String reason)
private void terminateConnection(int statusCode, String reason)
{
CloseFrame close = new CloseFrame(statusCode,reason);
// fire and forget -> close frame
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(buf);
generator.generate(buf,close);
BufferUtil.flipToFlush(buf,0);
getEndPoint().write(null,new WebSocketCloseCallback(this,buf),buf);
}
@ -226,6 +228,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser);
}
/**
* {@inheritDoc}
*/
@Override
public void write(BaseFrame frame) throws IOException
{
@ -252,6 +257,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
getEndPoint().write(null,nop,raw);
}
/**
* {@inheritDoc}
*/
@Override
public void write(byte[] data, int offset, int length) throws IOException
{
@ -262,6 +270,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
write(new BinaryFrame(data,offset,length));
}
/**
* {@inheritDoc}
*/
@Override
public void write(ByteBuffer... buffers) throws IOException
{
@ -288,6 +299,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
getEndPoint().write(null,nop,raw);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, BaseFrame... frames) throws IOException
{
@ -312,6 +326,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
getEndPoint().write(context,callback,raw);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IOException
{
@ -337,6 +354,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
getEndPoint().write(context,callback,raw);
}
/**
* {@inheritDoc}
*/
@Override
public <C> void write(C context, Callback<C> callback, String... messages) throws IOException
{
@ -358,6 +378,9 @@ public class AsyncWebSocketConnection extends AbstractAsyncConnection implements
write(context,callback,frames);
}
/**
* {@inheritDoc}
*/
@Override
public void write(String message) throws IOException
{

View File

@ -10,6 +10,12 @@ public class WebSocketCloseCallback implements Callback<Void>
private AsyncWebSocketConnection conn;
private ByteBuffer buf;
public WebSocketCloseCallback(AsyncWebSocketConnection conn)
{
this.conn = conn;
this.buf = null;
}
public WebSocketCloseCallback(AsyncWebSocketConnection conn, ByteBuffer buf)
{
this.conn = conn;
@ -19,15 +25,22 @@ public class WebSocketCloseCallback implements Callback<Void>
@Override
public void completed(Void context)
{
// release buffer
this.conn.getBufferPool().release(buf);
if (buf != null)
{
// release buffer
this.conn.getBufferPool().release(buf);
}
this.conn.getEndPoint().close();
}
@Override
public void failed(Void context, Throwable cause)
{
this.conn.getBufferPool().release(buf);
if (buf != null)
{
// release buffer
this.conn.getBufferPool().release(buf);
}
this.conn.getEndPoint().close();
}
}

View File

@ -18,7 +18,6 @@ import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/**
@ -51,7 +50,6 @@ public class WebSocketServletRFCTest
// trigger a WebSocket server terminated close.
if (message.equals("CRASH"))
{
System.out.printf("Got OnTextMessage");
throw new RuntimeException("Something bad happened");
}
@ -111,10 +109,9 @@ public class WebSocketServletRFCTest
/**
* Test the requirement of responding with server terminated close code 1011 when there is an unhandled (internal server error) being produced by the
* extended WebSocketServlet.
* WebSocket POJO.
*/
@Test
@Ignore("temporary, want to focus on onFillable first")
public void testInternalError() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());

View File

@ -277,7 +277,7 @@ public class BlockheadClient implements Parser.Listener
}
req.append("Sec-WebSocket-Version: ").append(version).append("\r\n");
req.append("\r\n");
write(req.toString());
writeRaw(req.toString());
}
public void setExtensions(String extensions)
@ -331,7 +331,7 @@ public class BlockheadClient implements Parser.Listener
BufferUtil.flipToFill(buf);
generator.generate(buf,frame);
BufferUtil.flipToFlush(buf,0);
out.write(BufferUtil.toArray(buf));
BufferUtil.writeTo(buf,out);
}
finally
{
@ -339,7 +339,13 @@ public class BlockheadClient implements Parser.Listener
}
}
public void write(String str) throws IOException
public void writeRaw(ByteBuffer buf) throws IOException
{
LOG.debug("write(ByteBuffer->{})",BufferUtil.toDetailString(buf));
BufferUtil.writeTo(buf,out);
}
public void writeRaw(String str) throws IOException
{
LOG.debug("write(String->{})",str);
out.write(StringUtil.getBytes(str,StringUtil.__ISO_8859_1));