433262 - WebSocket / Advanced close use cases

+ ClientCloseTest implementation of various outlined use cases.
This commit is contained in:
Joakim Erdfelt 2014-04-22 16:07:45 -07:00
parent 9f76856fcf
commit 900dea3719
24 changed files with 1436 additions and 588 deletions

View File

@ -31,11 +31,10 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.AbstractEventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.websocket.jsr356.metadata.EndpointMetadata;
public abstract class AbstractJsrEventDriver extends AbstractEventDriver implements EventDriver
public abstract class AbstractJsrEventDriver extends AbstractEventDriver
{
protected final EndpointMetadata metadata;
protected final EndpointConfig config;

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.websocket.CloseReason;
import javax.websocket.DecodeException;
@ -31,7 +32,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage;
@ -44,7 +44,7 @@ import org.eclipse.jetty.websocket.jsr356.messages.TextPartialOnMessage;
/**
* Base implementation for JSR-356 Annotated event drivers.
*/
public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver implements EventDriver
public class JsrAnnotatedEventDriver extends AbstractJsrEventDriver
{
private static final Logger LOG = Log.getLogger(JsrAnnotatedEventDriver.class);
private final JsrEvents<?, ?> events;

View File

@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
@ -34,7 +35,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.message.MessageInputStream;
import org.eclipse.jetty.websocket.common.message.MessageReader;
import org.eclipse.jetty.websocket.jsr356.JsrPongMessage;
@ -49,7 +49,7 @@ import org.eclipse.jetty.websocket.jsr356.messages.TextWholeMessage;
/**
* EventDriver for websocket that extend from {@link javax.websocket.Endpoint}
*/
public class JsrEndpointEventDriver extends AbstractJsrEventDriver implements EventDriver
public class JsrEndpointEventDriver extends AbstractJsrEventDriver
{
private static final Logger LOG = Log.getLogger(JsrEndpointEventDriver.class);

View File

@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
@ -158,7 +159,7 @@ public class ConnectionManager extends ContainerLifeCycle
sessions.add(session);
}
private void closeAllConnections()
private void shutdownAllConnections()
{
for (WebSocketSession session : sessions)
{
@ -166,11 +167,13 @@ public class ConnectionManager extends ContainerLifeCycle
{
try
{
session.getConnection().close();
session.getConnection().close(
StatusCode.SHUTDOWN,
"Shutdown");
}
catch (Throwable t)
{
LOG.debug("During Close All Connections",t);
LOG.debug("During Shutdown All Connections",t);
}
}
}
@ -203,7 +206,7 @@ public class ConnectionManager extends ContainerLifeCycle
@Override
protected void doStop() throws Exception
{
closeAllConnections();
shutdownAllConnections();
sessions.clear();
super.doStop();
removeBean(selector);

View File

@ -98,7 +98,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
else
{
// Standard "ws://"
endPoint.setIdleTimeout(connectPromise.getClient().getMaxIdleTimeout());
endPoint.setIdleTimeout(connectPromise.getDriver().getPolicy().getIdleTimeout());
return newUpgradeConnection(channel,endPoint,connectPromise);
}
}
@ -139,4 +139,9 @@ public class WebSocketClientSelectorManager extends SelectorManager
{
this.sslContextFactory = sslContextFactory;
}
public WebSocketPolicy getPolicy()
{
return policy;
}
}

View File

@ -0,0 +1,626 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.client.io.ConnectionManager;
import org.eclipse.jetty.websocket.client.io.WebSocketClientSelectorManager;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.test.RawFrameBuilder;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class ClientCloseTest
{
private static final Logger LOG = Log.getLogger(ClientCloseTest.class);
private static class CloseTrackingSocket extends WebSocketAdapter
{
private static final Logger LOG = Log.getLogger(ClientCloseTest.CloseTrackingSocket.class);
public int closeCode = -1;
public String closeReason = null;
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch openLatch = new CountDownLatch(1);
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public void assertNoCloseEvent()
{
Assert.assertThat("Client Close Event",closeLatch.getCount(),is(1L));
Assert.assertThat("Client Close Event Status Code ",closeCode,is(-1));
}
public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher<Integer> statusCodeMatcher, Matcher<String> reasonMatcher)
throws InterruptedException
{
long maxTimeout = clientTimeoutMs * 2;
Assert.assertThat("Client Close Event Occurred",closeLatch.await(maxTimeout,TimeUnit.MILLISECONDS),is(true));
Assert.assertThat("Client Close Event Status Code",closeCode,statusCodeMatcher);
if (reasonMatcher == null)
{
Assert.assertThat("Client Close Event Reason",closeReason,nullValue());
}
else
{
Assert.assertThat("Client Close Event Reason",closeReason,reasonMatcher);
}
}
public void assertReceivedError(Class<? extends Throwable> expectedThrownClass, Matcher<String> messageMatcher) throws TimeoutException,
InterruptedException
{
errorQueue.awaitEventCount(1,500,TimeUnit.MILLISECONDS);
Throwable actual = errorQueue.poll();
Assert.assertThat("Client Error Event",actual,instanceOf(expectedThrownClass));
if (messageMatcher == null)
{
Assert.assertThat("Client Error Event Message",actual.getMessage(),nullValue());
}
else
{
Assert.assertThat("Client Error Event Message",actual.getMessage(),messageMatcher);
}
}
public void clearQueues()
{
messageQueue.clear();
errorQueue.clear();
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
LOG.debug("onWebSocketClose({},{})",statusCode,reason);
super.onWebSocketClose(statusCode,reason);
closeCode = statusCode;
closeReason = reason;
closeLatch.countDown();
}
@Override
public void onWebSocketConnect(Session session)
{
super.onWebSocketConnect(session);
openLatch.countDown();
}
@Override
public void onWebSocketError(Throwable cause)
{
LOG.debug("onWebSocketError",cause);
Assert.assertThat("Error capture",errorQueue.offer(cause),is(true));
}
@Override
public void onWebSocketText(String message)
{
LOG.debug("onWebSocketText({})",message);
messageQueue.offer(message);
}
public EndPoint getEndPoint() throws Exception
{
Session session = getSession();
Assert.assertThat("Session type",session,instanceOf(WebSocketSession.class));
WebSocketSession wssession = (WebSocketSession)session;
Field fld = wssession.getClass().getDeclaredField("connection");
fld.setAccessible(true);
Assert.assertThat("Field: connection",fld,notNullValue());
Object val = fld.get(wssession);
Assert.assertThat("Connection type",val,instanceOf(AbstractWebSocketConnection.class));
@SuppressWarnings("resource")
AbstractWebSocketConnection wsconn = (AbstractWebSocketConnection)val;
return wsconn.getEndPoint();
}
}
@Rule
public TestTracker tt = new TestTracker();
private BlockheadServer server;
private WebSocketClient client;
private void confirmConnection(CloseTrackingSocket clientSocket, Future<Session> clientFuture, ServerConnection serverConn) throws Exception
{
// Wait for client connect on via future
clientFuture.get(500,TimeUnit.MILLISECONDS);
// Wait for client connect via client websocket
Assert.assertThat("Client WebSocket is Open",clientSocket.openLatch.await(500,TimeUnit.MILLISECONDS),is(true));
try
{
// Send message from client to server
final String echoMsg = "echo-test";
Future<Void> testFut = clientSocket.getRemote().sendStringByFuture(echoMsg);
// Wait for send future
testFut.get(500,TimeUnit.MILLISECONDS);
// Read Frame on server side
IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS);
serverCapture.assertNoErrors();
serverCapture.assertFrameCount(1);
WebSocketFrame frame = serverCapture.getFrames().poll();
Assert.assertThat("Server received frame",frame.getOpCode(),is(OpCode.TEXT));
Assert.assertThat("Server received frame payload",frame.getPayloadAsUTF8(),is(echoMsg));
// Server send echo reply
serverConn.write(new TextFrame().setPayload(echoMsg));
// Wait for received echo
clientSocket.messageQueue.awaitEventCount(1,1,TimeUnit.SECONDS);
// Verify received message
String recvMsg = clientSocket.messageQueue.poll();
Assert.assertThat("Received message",recvMsg,is(echoMsg));
// Verify that there are no errors
Assert.assertThat("Error events",clientSocket.errorQueue,empty());
}
finally
{
clientSocket.clearQueues();
}
}
private void confirmServerReceivedCloseFrame(ServerConnection serverConn, int expectedCloseCode, Matcher<String> closeReasonMatcher) throws IOException,
TimeoutException
{
IncomingFramesCapture serverCapture = serverConn.readFrames(1,500,TimeUnit.MILLISECONDS);
serverCapture.assertNoErrors();
serverCapture.assertFrameCount(1);
serverCapture.assertHasFrame(OpCode.CLOSE,1);
WebSocketFrame frame = serverCapture.getFrames().poll();
Assert.assertThat("Server received close frame",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo closeInfo = new CloseInfo(frame);
Assert.assertThat("Server received close code",closeInfo.getStatusCode(),is(expectedCloseCode));
if (closeReasonMatcher == null)
{
Assert.assertThat("Server received close reason",closeInfo.getReason(),nullValue());
}
else
{
Assert.assertThat("Server received close reason",closeInfo.getReason(),closeReasonMatcher);
}
}
public static class TestWebSocketClient extends WebSocketClient
{
@Override
protected ConnectionManager newConnectionManager()
{
return new TestConnectionManager(this);
}
}
public static class TestConnectionManager extends ConnectionManager
{
public TestConnectionManager(WebSocketClient client)
{
super(client);
}
@Override
protected WebSocketClientSelectorManager newWebSocketClientSelectorManager(WebSocketClient client)
{
return new TestSelectorManager(client);
}
}
public static class TestSelectorManager extends WebSocketClientSelectorManager
{
public TestSelectorManager(WebSocketClient client)
{
super(client);
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
return new TestEndPoint(channel,selectSet,selectionKey,getScheduler(),getPolicy().getIdleTimeout());
}
}
public static class TestEndPoint extends SelectChannelEndPoint
{
public AtomicBoolean congestedFlush = new AtomicBoolean(false);
public TestEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{
super(channel,selector,key,scheduler,idleTimeout);
}
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
boolean flushed = super.flush(buffers);
congestedFlush.set(!flushed);
return flushed;
}
}
@Before
public void startClient() throws Exception
{
client = new TestWebSocketClient();
client.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopClient() throws Exception
{
if (client.isRunning())
{
client.stop();
}
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
public void testHalfClose() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
// Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
// Server accepts connect
ServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// client sends close frame (code 1000, normal)
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
// server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
// server sends 2 messages
serverConn.write(new TextFrame().setPayload("Hello"));
serverConn.write(new TextFrame().setPayload("World"));
// server sends close frame (code 1000, no reason)
CloseInfo sclose = new CloseInfo(StatusCode.NORMAL,"From Server");
serverConn.write(sclose.asFrame());
// client receives 2 messages
clientSocket.messageQueue.awaitEventCount(2,1,TimeUnit.SECONDS);
// Verify received messages
String recvMsg = clientSocket.messageQueue.poll();
Assert.assertThat("Received message 1",recvMsg,is("Hello"));
recvMsg = clientSocket.messageQueue.poll();
Assert.assertThat("Received message 2",recvMsg,is("World"));
// Verify that there are no errors
Assert.assertThat("Error events",clientSocket.errorQueue,empty());
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.NORMAL),containsString("From Server"));
}
@Test
public void testNetworkCongestion() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
// Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
// Server accepts connect
ServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// client sends BIG frames (until it cannot write anymore)
// server must not read (for test purpose, in order to congest connection)
// when write is congested, client enqueue close frame
// client initiate write, but write never completes
EndPoint endp = clientSocket.getEndPoint();
Assert.assertThat("EndPoint is testable",endp,instanceOf(TestEndPoint.class));
TestEndPoint testendp = (TestEndPoint)endp;
char msg[] = new char[10240];
int writeCount = 0;
long writeSize = 0;
int i = 0;
while (!testendp.congestedFlush.get())
{
int z = i - ((i / 26) * 26);
char c = (char)('a' + z);
Arrays.fill(msg,c);
clientSocket.getRemote().sendStringByFuture(String.valueOf(msg));
writeCount++;
writeSize += msg.length;
}
LOG.debug("Wrote {} frames totalling {} bytes of payload before congestion kicked in",writeCount,writeSize);
// Verify that there are no errors
Assert.assertThat("Error events",clientSocket.errorQueue,empty());
// client idle timeout triggers close event on client ws-endpoint
// client close event on ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
}
@Test
public void testProtocolException() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
// Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
// Server accepts connect
ServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// server sends bad close frame (too big of a reason message)
byte msg[] = new byte[400];
Arrays.fill(msg,(byte)'x');
ByteBuffer bad = ByteBuffer.allocate(500);
RawFrameBuilder.putOpFin(bad,OpCode.CLOSE,true);
RawFrameBuilder.putLength(bad,msg.length + 2,false);
bad.putShort((short)StatusCode.NORMAL);
bad.put(msg);
BufferUtil.flipToFlush(bad,0);
serverConn.write(bad);
// client should have noticed the error
clientSocket.assertReceivedError(ProtocolException.class,containsString("Invalid control frame"));
// client parse invalid frame, notifies server of close (protocol error)
confirmServerReceivedCloseFrame(serverConn,StatusCode.PROTOCOL,allOf(containsString("Invalid control frame"),containsString("length")));
// server disconnects
serverConn.disconnect();
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length")));
}
@Test
public void testReadEOF() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
// Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
// Server accepts connect
ServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// client sends close frame
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
// server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// server shuts down connection (no frame reply)
serverConn.disconnect();
// client reads -1 (EOF)
// client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF"));
}
@Test
public void testServerNoCloseHandshake() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
// Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
// Server accepts connect
ServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// client sends close frame
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
// server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason));
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
// server never sends close frame handshake
// server sits idle
// client idle timeout triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
// assert - close reason message contains (timeout)
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("Timeout"));
}
@Test
public void testStopLifecycle() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
int clientCount = 3;
CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount];
ServerConnection serverConns[] = new ServerConnection[clientCount];
// Connect Multiple Clients
for (int i = 0; i < clientCount; i++)
{
// Client Request Upgrade
clientSockets[i] = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSockets[i],server.getWsUri());
// Server accepts connection
serverConns[i] = server.accept();
serverConns[i].upgrade();
// client confirms connection via echo
confirmConnection(clientSockets[i],clientConnectFuture,serverConns[i]);
}
// client lifecycle stop
client.stop();
// clients send close frames (code 1001, shutdown)
for (int i = 0; i < clientCount; i++)
{
// server receives close frame
confirmServerReceivedCloseFrame(serverConns[i],StatusCode.SHUTDOWN,containsString("Shutdown"));
}
// clients disconnect
for (int i = 0; i < clientCount; i++)
{
clientSockets[i].assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Shutdown"));
}
}
@Test
public void testWriteException() throws Exception
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
// Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri());
// Server accepts connect
ServerConnection serverConn = server.accept();
serverConn.upgrade();
// client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn);
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
// client enqueue close frame
// client write failure
final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason);
clientSocket.assertReceivedError(EofException.class,null);
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
// assert - close reason message contains (write failure)
clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF"));
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -32,7 +31,6 @@ public class ServerWriteThread extends Thread
{
private static final Logger LOG = Log.getLogger(ServerWriteThread.class);
private final ServerConnection conn;
private Exchanger<String> exchanger;
private int slowness = -1;
private int messageCount = 100;
private String message = "Hello";
@ -42,11 +40,6 @@ public class ServerWriteThread extends Thread
this.conn = conn;
}
public Exchanger<String> getExchanger()
{
return exchanger;
}
public String getMessage()
{
return message;
@ -73,12 +66,6 @@ public class ServerWriteThread extends Thread
{
conn.write(new TextFrame().setPayload(message));
if (exchanger != null)
{
// synchronized on exchange
exchanger.exchange(message);
}
m.incrementAndGet();
if (slowness > 0)
@ -93,11 +80,6 @@ public class ServerWriteThread extends Thread
}
}
public void setExchanger(Exchanger<String> exchanger)
{
this.exchanger = exchanger;
}
public void setMessage(String message)
{
this.message = message;

View File

@ -49,7 +49,7 @@ public class SlowServerTest
public void startClient() throws Exception
{
client = new WebSocketClient();
client.getPolicy().setIdleTimeout(60000);
client.setMaxIdleTimeout(60000);
client.start();
}
@ -78,7 +78,7 @@ public class SlowServerTest
{
JettyTrackingSocket tsocket = new JettyTrackingSocket();
client.setMasker(new ZeroMasker());
client.getPolicy().setIdleTimeout(60000);
client.setMaxIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<Session> future = client.connect(tsocket,wsUri);
@ -123,41 +123,38 @@ public class SlowServerTest
@Slow
public void testServerSlowToSend() throws Exception
{
// final Exchanger<String> exchanger = new Exchanger<String>();
JettyTrackingSocket tsocket = new JettyTrackingSocket();
// tsocket.messageExchanger = exchanger;
JettyTrackingSocket clientSocket = new JettyTrackingSocket();
client.setMasker(new ZeroMasker());
client.getPolicy().setIdleTimeout(60000);
client.setMaxIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<Session> future = client.connect(tsocket,wsUri);
Future<Session> clientConnectFuture = client.connect(clientSocket,wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
ServerConnection serverConn = server.accept();
serverConn.setSoTimeout(60000);
serverConn.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
clientConnectFuture.get(500,TimeUnit.MILLISECONDS);
clientSocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have server write slowly.
int messageCount = 1000;
ServerWriteThread writer = new ServerWriteThread(sconnection);
ServerWriteThread writer = new ServerWriteThread(serverConn);
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
// writer.setExchanger(exchanger);
writer.setSlowness(10);
writer.start();
writer.join();
// Verify receive
Assert.assertThat("Message Receive Count",tsocket.messageQueue.size(),is(messageCount));
Assert.assertThat("Message Receive Count",clientSocket.messageQueue.size(),is(messageCount));
// Close
sconnection.close(StatusCode.NORMAL);
serverConn.close(StatusCode.NORMAL);
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
Assert.assertTrue("Client Socket Closed",clientSocket.closeLatch.await(10,TimeUnit.SECONDS));
clientSocket.assertCloseCode(StatusCode.NORMAL);
}
}

View File

@ -1,115 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.test.BlockheadServer;
import org.eclipse.jetty.websocket.common.test.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
/**
* Various tests for Timeout handling
*/
public class TimeoutTest
{
@Rule
public TestTracker tt = new TestTracker();
private BlockheadServer server;
private WebSocketClient client;
@Before
public void startClient() throws Exception
{
client = new WebSocketClient();
client.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here)
client.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopClient() throws Exception
{
client.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
/**
* In a situation where the upgrade/connection is successful, and there is no activity for a while, the idle timeout triggers on the client side and
* automatically initiates a close handshake.
*/
@Test
public void testIdleDetectedByClient() throws Exception
{
JettyTrackingSocket wsocket = new JettyTrackingSocket();
URI wsUri = server.getWsUri();
client.setMaxIdleTimeout(1000);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
try
{
ssocket.startEcho();
// Validate that connect occurred
future.get(500,TimeUnit.MILLISECONDS);
wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Wait for inactivity idle timeout.
long start = System.currentTimeMillis();
wsocket.waitForClose(2,TimeUnit.SECONDS);
long end = System.currentTimeMillis();
long dur = (end - start);
// Make sure idle timeout takes less than 5 total seconds
Assert.assertThat("Idle Timeout",dur,lessThanOrEqualTo(3000L));
// Client should see a close event, with abnormal status NO_CLOSE
wsocket.assertCloseCode(StatusCode.ABNORMAL);
}
finally
{
ssocket.stopEcho();
}
}
}

View File

@ -1,12 +1,17 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.LEVEL=DEBUG
# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectChannelEndPoint.LEVEL=DEBUG
# org.eclipse.jetty.io.IdleTimeout.LEVEL=DEBUG
# org.eclipse.jetty.io.FillInterest.LEVEL=DEBUG
# org.eclipse.jetty.io.AbstractConnection.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.client.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.WriteBytesProvider.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.Utf8StringBuilder;
@ -65,7 +66,8 @@ public class CloseInfo
statusCode |= (data.get() & 0xFF) << 8;
statusCode |= (data.get() & 0xFF);
if(validate) {
if (validate)
{
if ((statusCode < StatusCode.NORMAL) || (statusCode == StatusCode.UNDEFINED) || (statusCode == StatusCode.NO_CLOSE)
|| (statusCode == StatusCode.NO_CODE) || ((statusCode > 1011) && (statusCode <= 2999)) || (statusCode >= 5000))
{
@ -120,7 +122,7 @@ public class CloseInfo
public CloseInfo(int statusCode)
{
this(statusCode, null);
this(statusCode,null);
}
public CloseInfo(int statusCode, String reason)
@ -144,8 +146,9 @@ public class CloseInfo
utf = StringUtil.getUtf8Bytes(reason);
len += utf.length;
}
ByteBuffer buf = ByteBuffer.allocate(len);
ByteBuffer buf = BufferUtil.allocate(len);
BufferUtil.flipToFill(buf);
buf.put((byte)((statusCode >>> 8) & 0xFF));
buf.put((byte)((statusCode >>> 0) & 0xFF));
@ -153,7 +156,7 @@ public class CloseInfo
{
buf.put(utf,0,utf.length);
}
buf.flip();
BufferUtil.flipToFlush(buf,0);
return buf;
}
@ -162,7 +165,14 @@ public class CloseInfo
{
CloseFrame frame = new CloseFrame();
frame.setFin(true);
frame.setPayload(asByteBuffer());
if ((statusCode >= 1000) && (statusCode != StatusCode.NO_CLOSE) && (statusCode != StatusCode.NO_CODE))
{
if (statusCode == StatusCode.FAILED_TLS_HANDSHAKE)
{
throw new ProtocolException("Close Frame with status code " + statusCode + " not allowed (per RFC6455)");
}
frame.setPayload(asByteBuffer());
}
return frame;
}
@ -180,10 +190,10 @@ public class CloseInfo
{
return !((statusCode == StatusCode.NORMAL) || (statusCode == StatusCode.NO_CODE));
}
public boolean isAbnormal()
{
return (statusCode == StatusCode.ABNORMAL);
return (statusCode != StatusCode.NORMAL);
}
@Override

View File

@ -96,8 +96,9 @@ public class Parser
private void assertSanePayloadLength(long len)
{
if (LOG.isDebugEnabled())
LOG.debug("Payload Length: {} - {}",len,this);
if (LOG.isDebugEnabled()) {
LOG.debug("{} Payload Length: {} - {}",policy.getBehavior(),len,this);
}
// Since we use ByteBuffer so often, having lengths over Integer.MAX_VALUE is really impossible.
if (len > Integer.MAX_VALUE)
@ -239,7 +240,7 @@ public class Parser
incomingFramesHandler.incomingError(e);
}
public void parse(ByteBuffer buffer)
public void parse(ByteBuffer buffer) throws WebSocketException
{
if (buffer.remaining() <= 0)
{
@ -266,13 +267,20 @@ public class Parser
{
buffer.position(buffer.limit()); // consume remaining
reset();
// let session know
notifyWebSocketException(e);
// need to throw for proper close behavior in connection
throw e;
}
catch (Throwable t)
{
buffer.position(buffer.limit()); // consume remaining
reset();
notifyWebSocketException(new WebSocketException(t));
// let session know
WebSocketException e = new WebSocketException(t);
notifyWebSocketException(e);
// need to throw for proper close behavior in connection
throw e;
}
}
@ -299,7 +307,9 @@ public class Parser
private boolean parseFrame(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
{
LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining());
}
while (buffer.hasRemaining())
{
switch (state)
@ -318,7 +328,8 @@ public class Parser
}
if (LOG.isDebugEnabled())
LOG.debug("OpCode {}, fin={} rsv={}{}{}",
LOG.debug("{} OpCode {}, fin={} rsv={}{}{}",
policy.getBehavior(),
OpCode.name(opcode),
fin,
(isRsv1InUse()?'1':'.'),
@ -412,11 +423,6 @@ public class Parser
throw new ProtocolException("RSV3 not allowed to be set");
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("OpCode {}, fin={} rsv=000",OpCode.name(opcode),fin);
}
state = State.PAYLOAD_LEN;
break;
@ -591,8 +597,9 @@ public class Parser
buffer.limit(limit);
buffer.position(buffer.position() + window.remaining());
if (LOG.isDebugEnabled())
LOG.debug("Window: {}",BufferUtil.toDetailString(window));
if (LOG.isDebugEnabled()) {
LOG.debug("{} Window: {}",policy.getBehavior(),BufferUtil.toDetailString(window));
}
maskProcessor.process(window);

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@ -90,20 +91,19 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
@Override
public void close()
{
this.close(StatusCode.NORMAL, null);
this.close(StatusCode.NORMAL,null);
}
@Override
public void close(CloseStatus closeStatus)
{
this.close(closeStatus.getCode(), closeStatus.getPhrase());
this.close(closeStatus.getCode(),closeStatus.getPhrase());
}
@Override
public void close(int statusCode, String reason)
{
connection.close(statusCode, reason);
notifyClose(statusCode, reason);
connection.close(statusCode,reason);
}
/**
@ -115,7 +115,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
connection.disconnect();
// notify of harsh disconnect
notifyClose(StatusCode.NO_CLOSE, "Harsh disconnect");
notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
}
public void dispatch(Runnable runnable)
@ -130,7 +130,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
out.append(indent).append(" +- incomingHandler : ");
if (incomingHandler instanceof Dumpable)
{
((Dumpable)incomingHandler).dump(out, indent + " ");
((Dumpable)incomingHandler).dump(out,indent + " ");
}
else
{
@ -140,7 +140,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
out.append(indent).append(" +- outgoingHandler : ");
if (outgoingHandler instanceof Dumpable)
{
((Dumpable)outgoingHandler).dump(out, indent + " ");
((Dumpable)outgoingHandler).dump(out,indent + " ");
}
else
{
@ -273,7 +273,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
{
final int prime = 31;
int result = 1;
result = (prime * result) + ((connection == null) ? 0 : connection.hashCode());
result = (prime * result) + ((connection == null)?0:connection.hashCode());
return result;
}
@ -328,7 +328,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
public void notifyClose(int statusCode, String reason)
{
websocket.onClose(new CloseInfo(statusCode, reason));
if (LOG.isDebugEnabled())
{
LOG.debug("notifyClose({},{})",statusCode,reason);
}
websocket.onClose(new CloseInfo(statusCode,reason));
}
public void notifyError(Throwable cause)
@ -342,12 +346,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
{
switch (state)
{
case CLOSING:
case CLOSED:
// notify session listeners
for (SessionListener listener : sessionListeners)
{
try
{
LOG.debug("{}.onSessionClosed()",listener.getClass().getSimpleName());
listener.onSessionClosed(this);
}
catch (Throwable t)
@ -355,12 +360,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
LOG.ignore(t);
}
}
break;
case CLOSED:
IOState ioState = this.connection.getIOState();
CloseInfo close = ioState.getCloseInfo();
// confirmed close of local endpoint
notifyClose(close.getStatusCode(), close.getReason());
notifyClose(close.getStatusCode(),close.getReason());
break;
case OPEN:
// notify session listeners
@ -394,17 +397,32 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
connection.getIOState().onConnected();
// Connect remote
remote = new WebSocketRemoteEndpoint(connection, outgoingHandler, getBatchMode());
remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
// Open WebSocket
websocket.openSession(this);
// Open connection
connection.getIOState().onOpened();
if (LOG.isDebugEnabled())
try
{
LOG.debug("open -> {}", dump());
// Open WebSocket
websocket.openSession(this);
// Open connection
connection.getIOState().onOpened();
if (LOG.isDebugEnabled())
{
LOG.debug("open -> {}",dump());
}
}
catch (Throwable t)
{
// Exception on end-user WS-Endpoint.
// Fast-fail & close connection with reason.
int statusCode = StatusCode.SERVER_ERROR;
if(policy.getBehavior() == WebSocketBehavior.CLIENT)
{
statusCode = StatusCode.POLICY_VIOLATION;
}
close(statusCode,t.getMessage());
}
}
@ -444,11 +462,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
List<String> values = entry.getValue();
if (values != null)
{
this.parameterMap.put(entry.getKey(), values.toArray(new String[values.size()]));
this.parameterMap.put(entry.getKey(),values.toArray(new String[values.size()]));
}
else
{
this.parameterMap.put(entry.getKey(), new String[0]);
this.parameterMap.put(entry.getKey(),new String[0]);
}
}
}

View File

@ -88,13 +88,7 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
{
if (LOG.isDebugEnabled())
{
LOG.debug("incoming(WebSocketException)",e);
}
if (e instanceof CloseException)
{
CloseException close = (CloseException)e;
terminateConnection(close.getStatusCode(),close.getMessage());
LOG.debug("incomingError(" + e.getClass().getName() + ")",e);
}
onError(e);
@ -105,7 +99,7 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onFrame({})",websocket.getClass().getSimpleName(),frame);
LOG.debug("incomingFrame({})",frame);
}
try
@ -226,6 +220,7 @@ public abstract class AbstractEventDriver implements IncomingFrames, EventDriver
catch (Throwable t)
{
unhandled(t);
throw t;
}
}

View File

@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@ -54,13 +53,13 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
* Provides the implementation of {@link LogicalConnection} within the
* framework of the new {@link Connection} framework of {@code jetty-io}.
* Provides the implementation of {@link LogicalConnection} within the framework of the new {@link Connection} framework of {@code jetty-io}.
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable
{
@ -68,7 +67,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
{
super(bufferPool, generator, endpoint, getPolicy().getMaxBinaryMessageBufferSize(), 8);
super(bufferPool,generator,endpoint,getPolicy().getMaxBinaryMessageBufferSize(),8);
}
@Override
@ -106,7 +105,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
// Abnormal Close
reason = CloseStatus.trimMaxReasonLength(reason);
session.notifyError(x);
session.notifyClose(StatusCode.NO_CLOSE,reason);
session.notifyClose(StatusCode.ABNORMAL,reason);
disconnect(); // disconnect endpoint & connection
}
@ -116,10 +115,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
private final boolean outputOnly;
public OnDisconnectCallback(boolean outputOnly) {
public OnDisconnectCallback(boolean outputOnly)
{
this.outputOnly = outputOnly;
}
@Override
public void writeFailed(Throwable x)
{
@ -218,10 +218,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void close(int statusCode, String reason)
{
LOG.debug("close({},{})",statusCode,reason);
CloseInfo close = new CloseInfo(statusCode,reason);
if (statusCode == StatusCode.ABNORMAL)
{
flusher.close(); // TODO this makes the IdleTimeoutTest pass, but I'm dubious it is the correct way
ioState.onAbnormalClose(close);
}
else
@ -230,7 +230,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
public void disconnect()
{
@ -366,7 +365,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void onClose()
{
LOG.debug("{} onClose()",policy.getBehavior());
super.onClose();
// ioState.onDisconnected();
flusher.close();
}
@ -385,18 +386,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
// Fire out a close frame, indicating abnormal shutdown, then disconnect
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false), BatchMode.OFF);
}
else
{
// Just disconnect
this.disconnect(false);
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
}
// Just disconnect
this.disconnect(false);
break;
case CLOSING:
CloseInfo close = ioState.getCloseInfo();
// reply to close handshake from remote
outgoingFrame(close.asFrame(),new OnDisconnectCallback(true), BatchMode.OFF);
outgoingFrame(close.asFrame(),new OnDisconnectCallback(true),BatchMode.OFF);
default:
break;
}
@ -447,20 +445,26 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
protected boolean onReadTimeout()
{
LOG.debug("{} Read Timeout",policy.getBehavior());
IOState state = getIOState();
if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
ConnectionState cstate = state.getConnectionState();
LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
if (cstate == ConnectionState.CLOSED)
{
// close already initiated, extra timeouts not relevant
// close already completed, extra timeouts not relevant
// allow underlying connection and endpoint to disconnect on its own
return true;
}
// Initiate close - politely send close frame.
session.notifyError(new SocketTimeoutException("Timeout on Read"));
// This is an Abnormal Close condition
close(StatusCode.ABNORMAL,"Idle Timeout");
try
{
session.notifyError(new SocketTimeoutException("Timeout on Read"));
}
finally
{
// This is an Abnormal Close condition
close(StatusCode.ABNORMAL,"Idle Timeout");
}
return false;
}
@ -476,7 +480,21 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("outgoingFrame({}, {})",frame,callback);
}
flusher.enqueue(frame,callback, batchMode);
CloseInfo close = null;
// grab a copy of the frame details before masking and whatnot
if (frame.getOpCode() == OpCode.CLOSE)
{
close = new CloseInfo(frame);
}
flusher.enqueue(frame,callback,batchMode);
// now trigger local close
if (close != null)
{
LOG.debug("outgoing CLOSE frame - {}: {}",frame,close);
ioState.onCloseLocal(close);
}
}
private int read(ByteBuffer buffer)
@ -504,7 +522,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
// TODO: has the end user application already consumed what it was given?
}
}
}
@ -520,6 +537,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
close(e.getStatusCode(),e.getMessage());
return -1;
}
catch (Throwable t)
{
LOG.warn(t);
close(StatusCode.ABNORMAL,t.getMessage());
return -1;
}
}
@Override

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -45,246 +44,22 @@ import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
*/
public class FrameFlusher
{
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private final ByteBufferPool bufferPool;
private final EndPoint endpoint;
private final int bufferSize;
private final Generator generator;
private final int maxGather;
private final Object lock = new Object();
private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16, 16, lock);
private final Flusher flusher = new Flusher();
private final AtomicBoolean closed = new AtomicBoolean();
private volatile Throwable failure;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
{
this.bufferPool = bufferPool;
this.endpoint = endpoint;
this.bufferSize = bufferSize;
this.generator = Objects.requireNonNull(generator);
this.maxGather = maxGather;
}
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
{
if (closed.get())
{
notifyCallbackFailure(callback, new EOFException("Connection has been closed locally"));
return;
}
if (flusher.isFailed())
{
notifyCallbackFailure(callback, failure);
return;
}
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
synchronized (lock)
{
switch (frame.getOpCode())
{
case OpCode.PING:
{
// Prepend PINGs so they are processed first.
queue.add(0, entry);
break;
}
case OpCode.CLOSE:
{
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed.set(true);
queue.add(entry);
break;
}
default:
{
queue.add(entry);
break;
}
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} queued {}", this, entry);
flusher.iterate();
}
public void close()
{
if (closed.compareAndSet(false, true))
{
LOG.debug("{} closing {}", this);
EOFException eof = new EOFException("Connection has been closed locally");
flusher.failed(eof);
// Fail also queued entries.
List<FrameEntry> entries = new ArrayList<>();
synchronized (lock)
{
entries.addAll(queue);
queue.clear();
}
// Notify outside sync block.
for (FrameEntry entry : entries)
notifyCallbackFailure(entry.callback, eof);
}
}
protected void onFailure(Throwable x)
{
LOG.warn(x);
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
callback.writeSuccess();
}
catch (Throwable x)
{
LOG.debug("Exception while notifying success of callback " + callback, x);
}
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
callback.writeFailed(failure);
}
catch (Throwable x)
{
LOG.debug("Exception while notifying failure of callback " + callback, x);
}
}
@Override
public String toString()
{
ByteBuffer aggregate = flusher.aggregate;
return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",
getClass().getSimpleName(),
queue.size(),
aggregate == null ? 0 : aggregate.position(),
failure);
}
private class Flusher extends IteratingCallback
{
private final List<FrameEntry> entries = new ArrayList<>(maxGather);
private final List<ByteBuffer> buffers = new ArrayList<>(maxGather * 2 + 1);
private final List<ByteBuffer> buffers = new ArrayList<>((maxGather * 2) + 1);
private ByteBuffer aggregate;
private BatchMode batchMode;
@Override
protected Action process() throws Exception
{
int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (lock)
{
while (entries.size() <= maxGather && !queue.isEmpty())
{
FrameEntry entry = queue.remove(0);
currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
// Force flush if we need to.
if (entry.frame == FLUSH_FRAME)
currentBatchMode = BatchMode.OFF;
int payloadLength = BufferUtil.length(entry.frame.getPayload());
int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
// If it is a "big" frame, avoid copying into the aggregate buffer.
if (approxFrameLength > (bufferSize >> 2))
currentBatchMode = BatchMode.OFF;
// If the aggregate buffer overflows, do not batch.
space -= approxFrameLength;
if (space <= 0)
currentBatchMode = BatchMode.OFF;
entries.add(entry);
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} processing {} entries: {}", FrameFlusher.this, entries.size(), entries);
if (entries.isEmpty())
{
if (batchMode != BatchMode.AUTO)
{
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
releaseAggregate();
return Action.IDLE;
}
LOG.debug("{} auto flushing", FrameFlusher.this);
return flush();
}
batchMode = currentBatchMode;
return currentBatchMode == BatchMode.OFF ? flush() : batch();
}
private Action flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
buffers.add(aggregate);
if (LOG.isDebugEnabled())
LOG.debug("{} flushing aggregate {}", FrameFlusher.this, aggregate);
}
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
// Skip the "synthetic" frame used for flushing.
if (entry.frame == FLUSH_FRAME)
continue;
buffers.add(entry.generateHeaderBytes());
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
buffers.add(payload);
}
if (LOG.isDebugEnabled())
LOG.debug("{} flushing {} frames: {}", FrameFlusher.this, entries.size(), entries);
if (buffers.isEmpty())
{
releaseAggregate();
// We may have the FLUSH_FRAME to notify.
succeedEntries();
return Action.IDLE;
}
endpoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
private Action batch()
{
if (aggregate == null)
{
aggregate = bufferPool.acquire(bufferSize, true);
aggregate = bufferPool.acquire(bufferSize,true);
if (LOG.isDebugEnabled())
LOG.debug("{} acquired aggregate buffer {}", FrameFlusher.this, aggregate);
{
LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
}
}
// Do not allocate the iterator here.
@ -296,17 +71,149 @@ public class FrameFlusher
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
BufferUtil.append(aggregate, payload);
{
BufferUtil.append(aggregate,payload);
}
}
if (LOG.isDebugEnabled())
LOG.debug("{} aggregated {} frames: {}", FrameFlusher.this, entries.size(), entries);
{
LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
}
succeeded();
return Action.SCHEDULED;
}
@Override
protected void completed()
{
// This IteratingCallback never completes.
}
@Override
public void failed(Throwable x)
{
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback,x);
entry.release();
}
entries.clear();
super.failed(x);
failure = x;
onFailure(x);
}
private Action flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
buffers.add(aggregate);
if (LOG.isDebugEnabled())
{
LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
}
}
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
// Skip the "synthetic" frame used for flushing.
if (entry.frame == FLUSH_FRAME)
{
continue;
}
buffers.add(entry.generateHeaderBytes());
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
{
buffers.add(payload);
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
}
if (buffers.isEmpty())
{
releaseAggregate();
// We may have the FLUSH_FRAME to notify.
succeedEntries();
return Action.IDLE;
}
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
@Override
protected Action process() throws Exception
{
int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (lock)
{
while ((entries.size() <= maxGather) && !queue.isEmpty())
{
FrameEntry entry = queue.remove(0);
currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
// Force flush if we need to.
if (entry.frame == FLUSH_FRAME)
{
currentBatchMode = BatchMode.OFF;
}
int payloadLength = BufferUtil.length(entry.frame.getPayload());
int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
// If it is a "big" frame, avoid copying into the aggregate buffer.
if (approxFrameLength > (bufferSize >> 2))
{
currentBatchMode = BatchMode.OFF;
}
// If the aggregate buffer overflows, do not batch.
space -= approxFrameLength;
if (space <= 0)
{
currentBatchMode = BatchMode.OFF;
}
entries.add(entry);
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
}
if (entries.isEmpty())
{
if (batchMode != BatchMode.AUTO)
{
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
releaseAggregate();
return Action.IDLE;
}
LOG.debug("{} auto flushing",FrameFlusher.this);
return flush();
}
batchMode = currentBatchMode;
return currentBatchMode == BatchMode.OFF?flush():batch();
}
private void releaseAggregate()
{
if (aggregate != null && BufferUtil.isEmpty(aggregate))
if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
{
bufferPool.release(aggregate);
aggregate = null;
@ -331,26 +238,6 @@ public class FrameFlusher
}
entries.clear();
}
@Override
protected void completed()
{
// This IteratingCallback never completes.
}
@Override
public void failed(Throwable x)
{
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback, x);
entry.release();
}
entries.clear();
super.failed(x);
failure = x;
onFailure(x);
}
}
private class FrameEntry
@ -374,7 +261,7 @@ public class FrameFlusher
private void generateHeaderBytes(ByteBuffer buffer)
{
generator.generateHeaderBytes(frame, buffer);
generator.generateHeaderBytes(frame,buffer);
}
private void release()
@ -389,7 +276,145 @@ public class FrameFlusher
@Override
public String toString()
{
return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, failure);
return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
}
}
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private final ByteBufferPool bufferPool;
private final EndPoint endpoint;
private final int bufferSize;
private final Generator generator;
private final int maxGather;
private final Object lock = new Object();
private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
private final Flusher flusher = new Flusher();
private final AtomicBoolean closed = new AtomicBoolean();
private volatile Throwable failure;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
{
this.bufferPool = bufferPool;
this.endpoint = endpoint;
this.bufferSize = bufferSize;
this.generator = Objects.requireNonNull(generator);
this.maxGather = maxGather;
}
public void close()
{
if (closed.compareAndSet(false,true))
{
LOG.debug("{} closing {}",this);
EOFException eof = new EOFException("Connection has been closed locally");
flusher.failed(eof);
// Fail also queued entries.
List<FrameEntry> entries = new ArrayList<>();
synchronized (lock)
{
entries.addAll(queue);
queue.clear();
}
// Notify outside sync block.
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback,eof);
}
}
}
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
{
if (closed.get())
{
notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
return;
}
if (flusher.isFailed())
{
notifyCallbackFailure(callback,failure);
return;
}
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
synchronized (lock)
{
switch (frame.getOpCode())
{
case OpCode.PING:
{
// Prepend PINGs so they are processed first.
queue.add(0,entry);
break;
}
case OpCode.CLOSE:
{
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed.set(true);
queue.add(entry);
break;
}
default:
{
queue.add(entry);
break;
}
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} queued {}",this,entry);
}
flusher.iterate();
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
{
callback.writeFailed(failure);
}
}
catch (Throwable x)
{
LOG.debug("Exception while notifying failure of callback " + callback,x);
}
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
{
callback.writeSuccess();
}
}
catch (Throwable x)
{
LOG.debug("Exception while notifying success of callback " + callback,x);
}
}
protected void onFailure(Throwable x)
{
LOG.warn(x);
}
@Override
public String toString()
{
ByteBuffer aggregate = flusher.aggregate;
return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
failure);
}
}

View File

@ -139,6 +139,10 @@ public class IOState
{
for (ConnectionStateListener listener : listeners)
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onConnectionStateChange({})",listener.getClass().getSimpleName(),state.name());
}
listener.onConnectionStateChange(state);
}
}
@ -166,8 +170,7 @@ public class IOState
}
this.state = ConnectionState.CLOSED;
if (closeInfo == null)
this.closeInfo = close;
this.closeInfo = close;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
@ -193,16 +196,16 @@ public class IOState
if (initialState == ConnectionState.CONNECTED)
{
// fast close. a local close request from end-user onConnected() method
// fast close. a local close request from end-user onConnect/onOpen method
LOG.debug("FastClose in CONNECTED detected");
// Force the state open (to allow read/write to endpoint)
onOpened();
LOG.debug("FastClose continuing with Closure");
}
synchronized (this)
{
if (closeInfo == null)
closeInfo = close;
closeInfo = close;
boolean in = inputAvailable;
boolean out = outputAvailable;
@ -236,7 +239,6 @@ public class IOState
LOG.debug("notifying state listeners: {}",event);
notifyStateListeners(event);
/*
// if abnormal, we don't expect an answer.
if (close.isAbnormal())
{
@ -253,7 +255,6 @@ public class IOState
notifyStateListeners(event);
return;
}
*/
}
}
@ -272,8 +273,7 @@ public class IOState
return;
}
if (closeInfo == null)
closeInfo = close;
closeInfo = close;
boolean in = inputAvailable;
boolean out = outputAvailable;
@ -360,7 +360,7 @@ public class IOState
// already opened
return;
}
if (this.state != ConnectionState.CONNECTED)
{
LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
@ -394,12 +394,11 @@ public class IOState
return;
}
CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF");
CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Read EOF");
this.cleanClose = false;
this.state = ConnectionState.CLOSED;
if (closeInfo == null)
this.closeInfo = close;
this.closeInfo = close;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
@ -408,6 +407,58 @@ public class IOState
notifyStateListeners(event);
}
public void onDisconnected()
{
ConnectionState event = null;
synchronized (this)
{
if (this.state == ConnectionState.CLOSED)
{
// already closed
return;
}
CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
this.cleanClose = false;
this.state = ConnectionState.CLOSED;
this.closeInfo = close;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
event = this.state;
}
notifyStateListeners(event);
}
@Override
public String toString()
{
StringBuilder str = new StringBuilder();
str.append(this.getClass().getSimpleName());
str.append("@").append(Integer.toHexString(hashCode()));
str.append("[").append(state);
str.append(',');
if (!inputAvailable)
{
str.append('!');
}
str.append("in,");
if (!outputAvailable)
{
str.append('!');
}
str.append("out");
if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
{
str.append(",close=").append(closeInfo);
str.append(",clean=").append(cleanClose);
str.append(",closeSource=").append(closeHandshakeSource);
}
str.append(']');
return str.toString();
}
public boolean wasAbnormalClose()
{
return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
@ -427,4 +478,5 @@ public class IOState
{
return closeHandshakeSource == CloseHandshakeSource.REMOTE;
}
}

View File

@ -392,4 +392,4 @@ public class ReflectUtils
}
return name;
}
}
}

View File

@ -0,0 +1,166 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import static org.eclipse.jetty.websocket.api.StatusCode.*;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.common.frames.CloseFrame;
import org.junit.Test;
public class CloseInfoTest
{
/**
* A test where no close is provided
*/
@Test
public void testAnonymousClose()
{
CloseInfo close = new CloseInfo();
assertThat("close.code",close.getStatusCode(),is(NO_CODE));
assertThat("close.reason",close.getReason(),nullValue());
CloseFrame frame = close.asFrame();
assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
// should result in no payload
assertThat("close frame has payload",frame.hasPayload(),is(false));
assertThat("close frame payload length",frame.getPayloadLength(),is(0));
}
/**
* A test where NO_CODE (1005) is provided
*/
@Test
public void testNoCode()
{
CloseInfo close = new CloseInfo(NO_CODE);
assertThat("close.code",close.getStatusCode(),is(NO_CODE));
assertThat("close.reason",close.getReason(),nullValue());
CloseFrame frame = close.asFrame();
assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
// should result in no payload
assertThat("close frame has payload",frame.hasPayload(),is(false));
assertThat("close frame payload length",frame.getPayloadLength(),is(0));
}
/**
* A test where NO_CLOSE (1006) is provided
*/
@Test
public void testNoClose()
{
CloseInfo close = new CloseInfo(NO_CLOSE);
assertThat("close.code",close.getStatusCode(),is(NO_CLOSE));
assertThat("close.reason",close.getReason(),nullValue());
CloseFrame frame = close.asFrame();
assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
// should result in no payload
assertThat("close frame has payload",frame.hasPayload(),is(false));
assertThat("close frame payload length",frame.getPayloadLength(),is(0));
}
/**
* A test of FAILED_TLS_HANDSHAKE (1007)
*/
@Test
public void testFailedTlsHandshake()
{
CloseInfo close = new CloseInfo(FAILED_TLS_HANDSHAKE);
assertThat("close.code",close.getStatusCode(),is(FAILED_TLS_HANDSHAKE));
assertThat("close.reason",close.getReason(),nullValue());
try
{
@SuppressWarnings("unused")
CloseFrame frame = close.asFrame();
fail("Expected " + ProtocolException.class.getName());
}
catch (ProtocolException e)
{
// expected path
assertThat("ProtocolException message",e.getMessage(),containsString("not allowed (per RFC6455)"));
}
}
/**
* A test of NORMAL (1000)
*/
@Test
public void testNormal()
{
CloseInfo close = new CloseInfo(NORMAL);
assertThat("close.code",close.getStatusCode(),is(NORMAL));
assertThat("close.reason",close.getReason(),nullValue());
CloseFrame frame = close.asFrame();
assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
assertThat("close frame payload length",frame.getPayloadLength(),is(2));
}
private ByteBuffer asByteBuffer(int statusCode, String reason)
{
int len = 2; // status code length
byte utf[] = null;
if (StringUtil.isNotBlank(reason))
{
utf = StringUtil.getUtf8Bytes(reason);
len += utf.length;
}
ByteBuffer buf = BufferUtil.allocate(len);
BufferUtil.flipToFill(buf);
buf.put((byte)((statusCode >>> 8) & 0xFF));
buf.put((byte)((statusCode >>> 0) & 0xFF));
if (utf != null)
{
buf.put(utf,0,utf.length);
}
BufferUtil.flipToFlush(buf,0);
return buf;
}
@Test
public void testFromFrame()
{
ByteBuffer payload = asByteBuffer(NORMAL,null);
assertThat("payload length", payload.remaining(), is(2));
CloseFrame frame = new CloseFrame();
frame.setPayload(payload);
// create from frame
CloseInfo close = new CloseInfo(frame);
assertThat("close.code",close.getStatusCode(),is(NORMAL));
assertThat("close.reason",close.getReason(),nullValue());
// and back again
frame = close.asFrame();
assertThat("close frame op code",frame.getOpCode(),is(OpCode.CLOSE));
assertThat("close frame payload length",frame.getPayloadLength(),is(2));
}
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.ab;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -39,8 +40,7 @@ public class TestABCase4
{
ByteBuffer expected = ByteBuffer.allocate(32);
expected.put(new byte[]
{ (byte)0x8b, 0x00 });
expected.put(new byte[] { (byte)0x8b, 0x00 });
expected.flip();
@ -50,10 +50,17 @@ public class TestABCase4
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
@ -65,8 +72,7 @@ public class TestABCase4
{
ByteBuffer expected = ByteBuffer.allocate(32);
expected.put(new byte[]
{ (byte)0x8c, 0x01, 0x00 });
expected.put(new byte[] { (byte)0x8c, 0x01, 0x00 });
expected.flip();
@ -76,24 +82,29 @@ public class TestABCase4
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 12"));
}
@Test
public void testParserNonControlOpCode3Case4_1_1() throws Exception
{
ByteBuffer expected = ByteBuffer.allocate(32);
expected.put(new byte[]
{ (byte)0x83, 0x00 });
expected.put(new byte[] { (byte)0x83, 0x00 });
expected.flip();
@ -103,10 +114,17 @@ public class TestABCase4
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();
@ -118,8 +136,7 @@ public class TestABCase4
{
ByteBuffer expected = ByteBuffer.allocate(32);
expected.put(new byte[]
{ (byte)0x84, 0x01, 0x00 });
expected.put(new byte[] { (byte)0x84, 0x01, 0x00 });
expected.flip();
@ -129,10 +146,17 @@ public class TestABCase4
{
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
try
{
parser.parse(expected);
}
catch (ProtocolException ignore)
{
// ignore
}
}
Assert.assertEquals( "error on undefined opcode", 1, capture.getErrorCount(WebSocketException.class)) ;
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
Throwable known = capture.getErrors().poll();

View File

@ -18,7 +18,8 @@
package org.eclipse.jetty.websocket.common.test;
import java.io.Closeable;
import static org.hamcrest.Matchers.*;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@ -68,10 +69,6 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.junit.Assert;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* A simple websocket client for performing unit tests with.
* <p>
@ -84,7 +81,7 @@ import static org.hamcrest.Matchers.notNullValue;
* with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
* scope.
*/
public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, Closeable
public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener, AutoCloseable
{
private static final String REQUEST_HASH_KEY = "dGhlIHNhbXBsZSBub25jZQ==";
private static final int BUFFER_SIZE = 8192;
@ -182,22 +179,14 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
public void close(int statusCode, String message)
{
LOG.debug("close({},{})",statusCode,message);
CloseInfo close = new CloseInfo(statusCode,message);
ioState.onCloseLocal(close);
if (!ioState.isClosed())
{
WebSocketFrame frame = close.asFrame();
LOG.debug("Issuing: {}",frame);
try
{
write(frame);
}
catch (IOException e)
{
LOG.debug(e);
}
ioState.onCloseLocal(close);
} else {
LOG.debug("Not issuing close. ioState = {}",ioState);
}
}
@ -429,13 +418,8 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
{
LOG.info("Client parsed {} frames",count);
}
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseInfo close = new CloseInfo(frame);
ioState.onCloseRemote(close);
}
// Capture Frame Copy
WebSocketFrame copy = WebSocketFrame.copy(frame);
incomingFrames.incomingFrame(copy);
}
@ -448,6 +432,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
@Override
public void onConnectionStateChange(ConnectionState state)
{
LOG.debug("CLIENT onConnectionStateChange() - {}", state);
switch (state)
{
case CLOSED:
@ -455,10 +440,17 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
// this.disconnect();
break;
case CLOSING:
if (ioState.wasRemoteCloseInitiated())
CloseInfo close = ioState.getCloseInfo();
WebSocketFrame frame = close.asFrame();
LOG.debug("Issuing: {}",frame);
try
{
CloseInfo close = ioState.getCloseInfo();
close(close.getStatusCode(),close.getReason());
write(frame);
}
catch (IOException e)
{
LOG.debug(e);
}
break;
default:
@ -701,6 +693,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames, Connecti
{
if (!ioState.isOpen())
{
LOG.debug("IO Not Open / Not Writing: {}",frame);
return;
}
LOG.debug("write(Frame->{}) to {}",frame,outgoing);

View File

@ -54,6 +54,7 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
@ -124,7 +125,6 @@ public class BlockheadServer
{
write(new CloseFrame());
flush();
disconnect();
}
public void close(int statusCode) throws IOException
@ -132,7 +132,6 @@ public class BlockheadServer
CloseInfo close = new CloseInfo(statusCode);
write(close.asFrame());
flush();
disconnect();
}
public void disconnect()
@ -229,6 +228,19 @@ public class BlockheadServer
CloseInfo close = new CloseInfo(frame);
LOG.debug("Close frame: {}",close);
}
Type type = frame.getType();
if (echoing.get() && (type.isData() || type.isContinuation()))
{
try
{
write(WebSocketFrame.copy(frame));
}
catch (IOException e)
{
LOG.warn(e);
}
}
}
@Override
@ -317,9 +329,18 @@ public class BlockheadServer
return len;
}
/**
* @deprecated use {@link #readFrames(int, int, TimeUnit)} for correct parameter order
*/
@Deprecated
public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
return readFrames(expectedCount,timeoutDuration,timeoutUnit);
}
public IncomingFramesCapture readFrames(int expectedCount, int timeoutDuration, TimeUnit timeoutUnit) throws IOException, TimeoutException
{
LOG.debug("Read: waiting for {} frame(s) from client",expectedCount);
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
@ -562,13 +583,22 @@ public class BlockheadServer
public void write(Frame frame) throws IOException
{
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
outgoing.outgoingFrame(frame,null, BatchMode.OFF);
outgoing.outgoingFrame(frame,null,BatchMode.OFF);
}
public void write(int b) throws IOException
{
getOutputStream().write(b);
}
public void write(ByteBuffer buf) throws IOException
{
byte arr[] = BufferUtil.toArray(buf);
if ((arr != null) && (arr.length > 0))
{
getOutputStream().write(arr);
}
}
}
private static final Logger LOG = Log.getLogger(BlockheadServer.class);

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
@ -199,11 +200,23 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
}
}
protected void closeAllConnections()
protected void shutdownAllConnections()
{
for (WebSocketSession session : openSessions)
{
session.close();
if (session.getConnection() != null)
{
try
{
session.getConnection().close(
StatusCode.SHUTDOWN,
"Shutdown");
}
catch (Throwable t)
{
LOG.debug("During Shutdown All Connections",t);
}
}
}
openSessions.clear();
}
@ -269,7 +282,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
@Override
protected void doStop() throws Exception
{
closeAllConnections();
shutdownAllConnections();
super.doStop();
}

View File

@ -114,7 +114,7 @@ public class WebSocketCloseTest
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})",sess);
sess.close();
sess.close(StatusCode.NORMAL,"FastCloseServer");
}
}
@ -129,14 +129,10 @@ public class WebSocketCloseTest
public void onWebSocketConnect(Session sess)
{
LOG.debug("onWebSocketConnect({})",sess);
// Test failure due to unhandled exception
// this should trigger a fast-fail closure during open/connect
throw new RuntimeException("Intentional FastFail");
}
@Override
public void onWebSocketError(Throwable cause)
{
errors.add(cause);
}
}
private static final Logger LOG = Log.getLogger(WebSocketCloseTest.class);
@ -163,30 +159,28 @@ public class WebSocketCloseTest
@Test
public void testFastClose() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());
client.setProtocols("fastclose");
client.setTimeout(TimeUnit.SECONDS,1);
try
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
{
client.setProtocols("fastclose");
client.setTimeout(TimeUnit.SECONDS,1);
client.connect();
client.sendStandardRequest();
client.expectUpgradeResponse();
// Verify that client got close frame
IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = capture.getFrames().poll();
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.NORMAL));
// Notify server of close handshake
client.write(close.asFrame()); // respond with close
// ensure server socket got close event
Assert.assertThat("Fast Close Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
Assert.assertThat("Fast Close.statusCode",closeSocket.closeStatusCode,is(StatusCode.NORMAL));
}
finally
{
client.close();
}
}
/**
@ -195,11 +189,10 @@ public class WebSocketCloseTest
@Test
public void testFastFail() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());
client.setProtocols("fastfail");
client.setTimeout(TimeUnit.SECONDS,1);
try
try (BlockheadClient client = new BlockheadClient(server.getServerUri()))
{
client.setProtocols("fastfail");
client.setTimeout(TimeUnit.SECONDS,1);
try (StacklessLogging scope = new StacklessLogging(AbstractEventDriver.class))
{
client.connect();
@ -214,14 +207,11 @@ public class WebSocketCloseTest
client.write(close.asFrame()); // respond with close
// ensure server socket got close event
Assert.assertThat("Fast Fail Latch",closeSocket.closeLatch.await(1,TimeUnit.SECONDS),is(true));
Assert.assertThat("Fast Fail.statusCode",closeSocket.closeStatusCode,is(StatusCode.SERVER_ERROR));
Assert.assertThat("Fast Fail.errors",closeSocket.errors.size(),is(1));
}
}
finally
{
client.close();
}
}
}