Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-plugin

This commit is contained in:
Thomas Becker 2012-09-05 10:42:02 +02:00
commit 8076d86376
20 changed files with 1291 additions and 442 deletions

View File

@ -0,0 +1,133 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Tests for conditions due to bad networking.
*/
@Ignore("Not working yet")
public class BadNetworkTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(250);
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
public void testAbruptClientClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
// Validate that we are connected
future.get(500,TimeUnit.MILLISECONDS);
wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have client disconnect abruptly
WebSocketConnection conn = wsocket.getConnection();
Assert.assertThat("Connection",conn,instanceOf(AbstractWebSocketConnection.class));
AbstractWebSocketConnection awsc = (AbstractWebSocketConnection)conn;
awsc.disconnect(false);
// Client Socket should see close
wsocket.waitForClose(10,TimeUnit.SECONDS);
// Client Socket should see a close event, with status NO_CLOSE
// This event is automatically supplied by the underlying WebSocketClientConnection
// in the situation of a bad network connection.
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
@Test
public void testAbruptServerClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
// Validate that we are connected
future.get(500,TimeUnit.MILLISECONDS);
wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have server disconnect abruptly
ssocket.disconnect();
// Wait for close
wsocket.waitForClose(10,TimeUnit.SECONDS);
// Client Socket should see a close event, with status NO_CLOSE
// This event is automatically supplied by the underlying WebSocketClientConnection
// in the situation of a bad network connection.
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
}

View File

@ -0,0 +1,201 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.net.ConnectException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Various connect condition testing
*/
public class ClientConnectTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test(expected = UpgradeException.class)
public void testBadHandshake() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// no upgrade, just fail with a 404 error
connection.respond("HTTP/1.1 404 NOT FOUND\r\n\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
@Test(expected = UpgradeException.class)
public void testBadUpgrade() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// Upgrade badly
connection.respond("HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: rubbish\r\n" + "\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
@Test
public void testConnectionNotAccepted() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
// Intentionally not accept incoming socket.
// server.accept();
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Should have Timed Out");
}
catch (TimeoutException e)
{
// Expected Path
wsocket.assertNotOpened();
}
}
@Test(expected = ConnectException.class)
@Ignore("Need to get information about connection issue out of SelectManager somehow")
public void testConnectionRefused() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
// Intentionally bad port
URI wsUri = new URI("ws://127.0.0.1:1");
Future<UpgradeResponse> future = client.connect(wsUri);
// The attempt to get upgrade response future should throw error
try
{
future.get(1000,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> ConnectException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
@Test(expected = TimeoutException.class)
public void testConnectionTimeout() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
Assert.assertNotNull(ssocket);
// Intentionally don't upgrade
// ssocket.upgrade();
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> TimeoutException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
}

View File

@ -0,0 +1,115 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
public class ClientWriteThread extends Thread
{
private static final Logger LOG = Log.getLogger(ClientWriteThread.class);
private final WebSocketConnection conn;
private Exchanger<String> exchanger;
private int slowness = -1;
private int messageCount = 100;
private String message = "Hello";
public ClientWriteThread(WebSocketConnection conn)
{
this.conn = conn;
}
public Exchanger<String> getExchanger()
{
return exchanger;
}
public String getMessage()
{
return message;
}
public int getMessageCount()
{
return messageCount;
}
public int getSlowness()
{
return slowness;
}
@Override
public void run()
{
final AtomicInteger m = new AtomicInteger();
try
{
while (m.get() < messageCount)
{
conn.write(null,new FutureCallback<Void>(),message);
if (exchanger != null)
{
// synchronized on exchange
exchanger.exchange(message);
}
m.incrementAndGet();
if (slowness > 0)
{
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
}
catch (InterruptedException | IOException e)
{
LOG.warn(e);
}
}
public void setExchanger(Exchanger<String> exchanger)
{
this.exchanger = exchanger;
}
public void setMessage(String message)
{
this.message = message;
}
public void setMessageCount(int messageCount)
{
this.messageCount = messageCount;
}
public void setSlowness(int slowness)
{
this.slowness = slowness;
}
}

View File

@ -0,0 +1,140 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
public class ServerReadThread extends Thread
{
private static final Logger LOG = Log.getLogger(ServerReadThread.class);
private final ServerConnection conn;
private boolean active = true;
private int slowness = -1; // disabled is default
private AtomicInteger frameCount = new AtomicInteger();
private CountDownLatch expectedMessageCount;
public ServerReadThread(ServerConnection conn)
{
this.conn = conn;
this.expectedMessageCount = new CountDownLatch(1);
}
public void cancel()
{
active = false;
}
public int getFrameCount()
{
return frameCount.get();
}
public int getSlowness()
{
return slowness;
}
@Override
public void run()
{
ByteBufferPool bufferPool = conn.getBufferPool();
WebSocketPolicy policy = conn.getPolicy();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(buf);
int len = 0;
try
{
while (active)
{
BufferUtil.clearToFill(buf);
len = conn.read(buf);
if (len > 0)
{
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
conn.getParser().parse(buf);
}
LinkedList<WebSocketFrame> frames = conn.getIncomingFrames().getFrames();
WebSocketFrame frame;
while ((frame = frames.poll()) != null)
{
frameCount.incrementAndGet();
if (frame.getOpCode() == OpCode.CLOSE)
{
active = false;
// automatically response to close frame
CloseInfo close = new CloseInfo(frame);
conn.close(close.getStatusCode());
}
expectedMessageCount.countDown();
}
if (slowness > 0)
{
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
}
catch (IOException | InterruptedException e)
{
LOG.warn(e);
}
finally
{
bufferPool.release(buf);
}
}
public void setExpectedMessageCount(int expectedMessageCount)
{
this.expectedMessageCount = new CountDownLatch(expectedMessageCount);
}
public void setSlowness(int slowness)
{
this.slowness = slowness;
}
public void waitForExpectedMessageCount(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Expected Message Count attained",expectedMessageCount.await(timeoutDuration,timeoutUnit),is(true));
}
}

View File

@ -0,0 +1,115 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class ServerWriteThread extends Thread
{
private static final Logger LOG = Log.getLogger(ServerWriteThread.class);
private final ServerConnection conn;
private Exchanger<String> exchanger;
private int slowness = -1;
private int messageCount = 100;
private String message = "Hello";
public ServerWriteThread(ServerConnection conn)
{
this.conn = conn;
}
public Exchanger<String> getExchanger()
{
return exchanger;
}
public String getMessage()
{
return message;
}
public int getMessageCount()
{
return messageCount;
}
public int getSlowness()
{
return slowness;
}
@Override
public void run()
{
final AtomicInteger m = new AtomicInteger();
try
{
while (m.get() < messageCount)
{
conn.write(WebSocketFrame.text(message));
if (exchanger != null)
{
// synchronized on exchange
exchanger.exchange(message);
}
m.incrementAndGet();
if (slowness > 0)
{
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
}
catch (InterruptedException | IOException e)
{
LOG.warn(e);
}
}
public void setExchanger(Exchanger<String> exchanger)
{
this.exchanger = exchanger;
}
public void setMessage(String message)
{
this.message = message;
}
public void setMessageCount(int messageCount)
{
this.messageCount = messageCount;
}
public void setSlowness(int slowness)
{
this.slowness = slowness;
}
}

View File

@ -0,0 +1,117 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SlowClientTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(60000);
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
@Slow
public void testClientSlowToSend() throws Exception
{
// final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
// tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Setup server read thread
ServerReadThread reader = new ServerReadThread(sconnection);
reader.setExpectedMessageCount(Integer.MAX_VALUE); // keep reading till I tell you to stop
reader.start();
// Have client write slowly.
int messageCount = 1000;
ClientWriteThread writer = new ClientWriteThread(tsocket.getConnection());
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
// writer.setExchanger(exchanger);
writer.setSlowness(50);
writer.start();
writer.join();
// Verify receive
Assert.assertThat("Frame Receive Count",reader.getFrameCount(),is(messageCount));
// Close
tsocket.getConnection().close(StatusCode.NORMAL,"Done");
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
reader.cancel(); // stop reading
}
}

View File

@ -0,0 +1,161 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SlowServerTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(60000);
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
@Slow
public void testServerSlowToRead() throws Exception
{
// final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
// tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
int messageCount = 10; // TODO: increase to 1000
// Setup slow server read thread
ServerReadThread reader = new ServerReadThread(sconnection);
reader.setExpectedMessageCount(messageCount);
reader.setSlowness(100); // slow it down
reader.start();
// Have client write as quickly as it can.
ClientWriteThread writer = new ClientWriteThread(tsocket.getConnection());
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
// writer.setExchanger(exchanger);
writer.setSlowness(-1); // disable slowness
writer.start();
writer.join();
// Verify receive
reader.waitForExpectedMessageCount(10,TimeUnit.SECONDS);
Assert.assertThat("Frame Receive Count",reader.getFrameCount(),is(messageCount));
// Close
tsocket.getConnection().close(StatusCode.NORMAL,"Done");
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
reader.cancel(); // stop reading
}
@Test
@Slow
public void testServerSlowToSend() throws Exception
{
// final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
// tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have server write slowly.
int messageCount = 1000;
ServerWriteThread writer = new ServerWriteThread(sconnection);
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
// writer.setExchanger(exchanger);
writer.setSlowness(50);
writer.start();
writer.join();
// Verify receive
Assert.assertThat("Message Receive Count",tsocket.messageQueue.size(),is(messageCount));
// Close
sconnection.close(StatusCode.NORMAL);
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
}
}

View File

@ -0,0 +1,105 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Various tests for Timeout handling
*/
@Ignore("Idle timeouts not working yet")
public class TimeoutTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here)
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
/**
* In a situation where the upgrade/connection is successfull, and there is no activity for a while, the idle timeout triggers on the client side and
* automatically initiates a close handshake.
*/
@Test
public void testIdleDetectedByClient() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
// Validate that connect occurred
future.get(500,TimeUnit.MILLISECONDS);
wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Wait for inactivity idle timeout.
long start = System.currentTimeMillis();
wsocket.waitForClose(10,TimeUnit.SECONDS);
long end = System.currentTimeMillis();
long dur = (end - start);
// Make sure idle timeout takes less than 5 total seconds
Assert.assertThat("Idle Timeout",dur,lessThanOrEqualTo(5000L));
// Client should see a close event, with status NO_CLOSE
wsocket.assertCloseCode(StatusCode.NORMAL);
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.client; package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger; import java.util.concurrent.Exchanger;
@ -31,9 +33,6 @@ import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection; import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.junit.Assert; import org.junit.Assert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
/** /**
* Testing Socket used on client side WebSocket testing. * Testing Socket used on client side WebSocket testing.
*/ */
@ -57,7 +56,7 @@ public class TrackingSocket extends WebSocketAdapter
public void assertCloseCode(int expectedCode) throws InterruptedException public void assertCloseCode(int expectedCode) throws InterruptedException
{ {
Assert.assertThat("Was Closed",closeLatch.await(500,TimeUnit.MILLISECONDS),is(true)); Assert.assertThat("Was Closed",closeLatch.await(50,TimeUnit.MILLISECONDS),is(true));
Assert.assertThat("Close Code",closeCode,is(expectedCode)); Assert.assertThat("Close Code",closeCode,is(expectedCode));
} }
@ -163,7 +162,17 @@ public class TrackingSocket extends WebSocketAdapter
} }
} }
public void waitForMessage(TimeUnit timeoutUnit, int timeoutDuration) throws InterruptedException public void waitForClose(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Client Socket Closed",closeLatch.await(timeoutDuration,timeoutUnit),is(true));
}
public void waitForConnected(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Client Socket Connected",openLatch.await(timeoutDuration,timeoutUnit),is(true));
}
public void waitForMessage(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{ {
Assert.assertThat("Message Received",dataLatch.await(timeoutDuration,timeoutUnit),is(true)); Assert.assertThat("Message Received",dataLatch.await(timeoutDuration,timeoutUnit),is(true));
} }

View File

@ -18,20 +18,14 @@
package org.eclipse.jetty.websocket.client; package org.eclipse.jetty.websocket.client;
import java.net.ConnectException; import static org.hamcrest.Matchers.*;
import java.net.URI; import java.net.URI;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer; import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
@ -40,14 +34,8 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@RunWith(AdvancedRunner.class) @RunWith(AdvancedRunner.class)
public class WebSocketClientTest public class WebSocketClientTest
@ -81,58 +69,6 @@ public class WebSocketClientTest
server.stop(); server.stop();
} }
@Test(expected = UpgradeException.class)
public void testBadHandshake() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// no upgrade, just fail with a 404 error
connection.respond("HTTP/1.1 404 NOT FOUND\r\n\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test(expected = UpgradeException.class)
public void testBadUpgrade() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// Upgrade badly
connection.respond("HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: rubbish\r\n" + "\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test @Test
public void testBasicEcho_FromClient() throws Exception public void testBasicEcho_FromClient() throws Exception
{ {
@ -161,7 +97,7 @@ public class WebSocketClientTest
cliSock.getConnection().write(null,new FutureCallback<Void>(),"Hello World!"); cliSock.getConnection().write(null,new FutureCallback<Void>(),"Hello World!");
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500); srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server // wait for response from server
cliSock.waitForMessage(TimeUnit.MILLISECONDS,500); cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);
cliSock.assertMessage("Hello World!"); cliSock.assertMessage("Hello World!");
} }
@ -193,263 +129,6 @@ public class WebSocketClientTest
wsocket.assertMessage("Hello World"); wsocket.assertMessage("Hello World");
} }
@Test
public void testBlockReceiving() throws Exception
{
final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
future.get(500,TimeUnit.MILLISECONDS);
// define some messages to send server to client
byte[] send = new byte[]
{ (byte)0x81, (byte)0x05, (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o' };
final int messages = 100000;
final AtomicInteger m = new AtomicInteger();
// Set up a consumer of received messages that waits a while before consuming
Thread consumer = new Thread()
{
@Override
public void run()
{
try
{
Thread.sleep(200);
while (m.get() < messages)
{
String msg = exchanger.exchange(null);
if ("Hello".equals(msg))
{
m.incrementAndGet();
}
else
{
throw new IllegalStateException("exchanged " + msg);
}
if ((m.get() % 1000) == 0)
{
// Artificially slow reader
Thread.sleep(10);
}
}
}
catch (InterruptedException e)
{
return;
}
catch (Exception e)
{
e.printStackTrace();
}
}
};
consumer.start();
long start = System.currentTimeMillis();
for (int i = 0; i < messages; i++)
{
sconnection.write(send,0,send.length);
sconnection.flush();
}
while (consumer.isAlive())
{
Thread.sleep(10);
}
// Duration of the read operation.
long readDur = (System.currentTimeMillis() - start);
Assert.assertThat("read duration",readDur,greaterThan(1000L)); // reading was blocked
Assert.assertEquals(m.get(),messages);
// Close with code
sconnection.close(StatusCode.NORMAL);
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
}
@Test
@Ignore("Not working, it hangs")
public void testBlockSending() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(10000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
final ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(250,TimeUnit.MILLISECONDS);
final int messages = 200000;
final AtomicLong totalB = new AtomicLong();
Thread consumer = new Thread()
{
@Override
public void run()
{
// Thread.sleep is for artificially poor performance reader needed for this testcase.
try
{
Thread.sleep(200);
byte[] recv = new byte[32 * 1024];
int len = 0;
while (len >= 0)
{
totalB.addAndGet(len);
len = ssocket.getInputStream().read(recv,0,recv.length);
Thread.sleep(10);
}
}
catch (InterruptedException e)
{
return;
}
catch (Exception e)
{
e.printStackTrace();
}
}
};
consumer.start();
// Send lots of messages client to server
long start = System.currentTimeMillis();
String mesg = "This is a test message to send";
for (int i = 0; i < messages; i++)
{
wsocket.getConnection().write(null,new FutureCallback<Void>(),mesg);
}
// Duration for the write phase
long writeDur = (System.currentTimeMillis() - start);
// wait for consumer to complete
while (totalB.get() < (messages * (mesg.length() + 6L)))
{
Thread.sleep(10);
}
Assert.assertThat("write duration",writeDur,greaterThan(1000L)); // writing was blocked
Assert.assertEquals(messages * (mesg.length() + 6L),totalB.get());
consumer.interrupt();
}
@Test
public void testConnectionNotAccepted() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
// Intentionally not accept incoming socket.
// server.accept();
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Should have Timed Out");
}
catch (TimeoutException e)
{
// Expected Path
wsocket.assertNotOpened();
}
}
@Test(expected = ConnectException.class)
@Ignore("Needs work in SelectManager to support this use case")
public void testConnectionRefused() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
// Intentionally bad port
URI wsUri = new URI("ws://127.0.0.1:1");
Future<UpgradeResponse> future = client.connect(wsUri);
// The attempt to get upgrade response future should throw error
try
{
future.get(1000,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> ConnectException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test(expected = TimeoutException.class)
public void testConnectionTimeout() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
Assert.assertNotNull(ssocket);
// Intentionally don't upgrade
// ssocket.upgrade();
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> TimeoutException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test
@Ignore("Work In Progress")
public void testIdle() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(500);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(250,TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
wsocket.closeLatch.await(10,TimeUnit.SECONDS);
Assert.assertTrue((System.currentTimeMillis() - start) < 5000);
wsocket.assertCloseCode(StatusCode.NORMAL);
}
@Test @Test
public void testMessageBiggerThanBufferSize() throws Exception public void testMessageBiggerThanBufferSize() throws Exception
{ {
@ -491,80 +170,4 @@ public class WebSocketClientTest
factSmall.stop(); factSmall.stop();
} }
} }
@Test
@Ignore("Work In Progress")
public void testNotIdle() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(500);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(250,TimeUnit.MILLISECONDS);
wsocket.assertIsOpen();
// Send some messages from client to server
byte[] recv = new byte[1024];
int len = -1;
for (int i = 0; i < 10; i++)
{
Thread.sleep(250);
wsocket.getConnection().write(null,new FutureCallback<Void>(),"Hello");
len = ssocket.getInputStream().read(recv,0,recv.length);
Assert.assertTrue(len > 0);
}
// Send some messages from server to client
byte[] send = new byte[]
{ (byte)0x81, (byte)0x02, (byte)'H', (byte)'i' };
for (int i = 0; i < 10; i++)
{
Thread.sleep(250);
ssocket.write(send,0,send.length);
ssocket.flush();
Assert.assertEquals("Hi",wsocket.messageQueue.poll(1,TimeUnit.SECONDS));
}
// Close with code
long start = System.currentTimeMillis();
ssocket.write(new byte[]
{ (byte)0x88, (byte)0x02, (byte)4, (byte)87 },0,4);
ssocket.flush();
wsocket.closeLatch.await(10,TimeUnit.SECONDS);
long dur = (System.currentTimeMillis() - start);
Assert.assertThat("Overall duration",dur,lessThanOrEqualTo(5000L));
wsocket.assertClose(StatusCode.PROTOCOL,"Invalid close code 1111");
}
@Test
@Ignore("Test for is-open is broken")
public void testUpgradeThenTCPClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(500,TimeUnit.MILLISECONDS);
wsocket.assertIsOpen();
ssocket.disconnect();
Assert.assertThat("Close should have been detected",wsocket.closeLatch.await(10,TimeUnit.SECONDS),is(true));
}
} }

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.client.blockhead; package org.eclipse.jetty.websocket.client.blockhead;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -62,9 +64,6 @@ import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert; import org.junit.Assert;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/** /**
* A overly simplistic websocket server used during testing. * A overly simplistic websocket server used during testing.
* <p> * <p>
@ -152,6 +151,16 @@ public class BlockheadServer
getOutputStream().flush(); getOutputStream().flush();
} }
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
public IncomingFramesCapture getIncomingFrames()
{
return incomingFrames;
}
public InputStream getInputStream() throws IOException public InputStream getInputStream() throws IOException
{ {
if (in == null) if (in == null)
@ -170,6 +179,16 @@ public class BlockheadServer
return out; return out;
} }
public Parser getParser()
{
return parser;
}
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override @Override
public void incoming(WebSocketException e) public void incoming(WebSocketException e)
{ {

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
# org.eclipse.jetty.LEVEL=DEBUG org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO # org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
org.eclipse.jetty.websocket.LEVEL=WARN org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG # org.eclipse.jetty.websocket.LEVEL=DEBUG

View File

@ -25,6 +25,18 @@ import java.net.InetSocketAddress;
*/ */
public interface BaseConnection public interface BaseConnection
{ {
public static enum State
{
/** Connection created, but not yet connected */
OPENING,
/** Connection created and connected */
OPENED,
/** Close handshake initiated, response pending. */
CLOSING,
/** Close handshake responded. */
CLOSED
}
/** /**
* Connection suspend token * Connection suspend token
*/ */
@ -37,7 +49,7 @@ public interface BaseConnection
} }
/** /**
* Terminate connection, {@link StatusCode#NORMAL}, without a reason. * Send a websocket Close frame, {@link StatusCode#NORMAL}, without a reason.
* <p> * <p>
* Basic usage: results in an non-blocking async write, then connection close. * Basic usage: results in an non-blocking async write, then connection close.
* *
@ -47,7 +59,7 @@ public interface BaseConnection
void close(); void close();
/** /**
* Terminate connection, with status code. * Send a websocket Close frame, with status code.
* <p> * <p>
* Advanced usage: results in an non-blocking async write, then connection close. * Advanced usage: results in an non-blocking async write, then connection close.
* *
@ -59,6 +71,11 @@ public interface BaseConnection
*/ */
void close(int statusCode, String reason); void close(int statusCode, String reason);
/**
* Terminate the connection (no close frame sent)
*/
void disconnect();
/** /**
* Get the remote Address in use for this connection. * Get the remote Address in use for this connection.
* *
@ -66,6 +83,13 @@ public interface BaseConnection
*/ */
InetSocketAddress getRemoteAddress(); InetSocketAddress getRemoteAddress();
/**
* Get the state of the connection.
*
* @return the state of the connection.
*/
State getState();
/** /**
* Simple test to see if connection is open (and not closed) * Simple test to see if connection is open (and not closed)
* *
@ -80,6 +104,11 @@ public interface BaseConnection
*/ */
boolean isReading(); boolean isReading();
/**
* Notify that the connection has entered the closing handshake
*/
void notifyClosing();
/** /**
* Suspend a the incoming read events on the connection. * Suspend a the incoming read events on the connection.
* *

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.api.CloseException; import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketException;
@ -169,7 +170,22 @@ public class WebSocketEventDriver implements IncomingFrames
{ {
events.onClose.call(websocket,session,close.getStatusCode(),close.getReason()); events.onClose.call(websocket,session,close.getStatusCode(),close.getReason());
} }
throw new CloseException(close.getStatusCode(),close.getReason());
// Is this close frame a response to a prior close?
if (session.getState() == BaseConnection.State.CLOSING)
{
// Then this is close response handshake (to a prior
// outgoing close frame)
session.disconnect();
}
else
{
// This is the initiator for a close handshake
// Trigger close response handshake.
session.notifyClosing();
session.close(close.getStatusCode(),close.getReason());
}
return;
} }
case OpCode.PING: case OpCode.PING:
{ {

View File

@ -68,6 +68,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private List<ExtensionConfig> extensions; private List<ExtensionConfig> extensions;
private boolean flushing; private boolean flushing;
private boolean isFilling; private boolean isFilling;
private BaseConnection.State connectionState;
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool) public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{ {
@ -80,18 +81,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.extensions = new ArrayList<>(); this.extensions = new ArrayList<>();
this.queue = new FrameQueue(); this.queue = new FrameQueue();
this.suspendToken = new AtomicBoolean(false); this.suspendToken = new AtomicBoolean(false);
this.connectionState = BaseConnection.State.OPENING;
} }
@Override @Override
public void close() public void close()
{ {
terminateConnection(StatusCode.NORMAL,null); close(StatusCode.NORMAL,null);
} }
@Override @Override
public void close(int statusCode, String reason) public void close(int statusCode, String reason)
{ {
terminateConnection(statusCode,reason); enqueClose(statusCode,reason);
} }
public <C> void complete(FrameBytes<C> frameBytes) public <C> void complete(FrameBytes<C> frameBytes)
@ -106,8 +108,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
} }
} }
@Override
public void disconnect()
{
disconnect(false);
}
public void disconnect(boolean onlyOutput) public void disconnect(boolean onlyOutput)
{ {
connectionState = BaseConnection.State.CLOSED;
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow // We need to gently close first, to allow
// SSL close alerts to be sent by Jetty // SSL close alerts to be sent by Jetty
@ -120,6 +129,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
} }
} }
/**
* Enqueue a close frame.
*
* @param statusCode
* the WebSocket status code.
* @param reason
* the (optional) reason string. (null is allowed)
* @see StatusCode
*/
private void enqueClose(int statusCode, String reason)
{
CloseInfo close = new CloseInfo(statusCode,reason);
FutureCallback<Void> nop = new FutureCallback<>();
ControlFrameBytes<Void> frameBytes = new ControlFrameBytes<Void>(this,nop,null,close.asFrame());
queue.append(frameBytes);
flush();
}
public void flush() public void flush()
{ {
FrameBytes<?> frameBytes = null; FrameBytes<?> frameBytes = null;
@ -153,7 +180,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size()); LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
} }
} }
write(buffer,frameBytes);
if (connectionState != BaseConnection.State.CLOSED)
{
write(buffer,frameBytes);
}
} }
public ByteBufferPool getBufferPool() public ByteBufferPool getBufferPool()
@ -209,10 +240,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return session; return session;
} }
@Override
public State getState()
{
return connectionState;
}
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
return getEndPoint().isOpen(); return (getState() != BaseConnection.State.CLOSED) && getEndPoint().isOpen();
} }
@Override @Override
@ -221,6 +258,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return isFilling; return isFilling;
} }
@Override
public void notifyClosing()
{
this.connectionState = BaseConnection.State.CLOSING;
}
@Override
public void onClose()
{
super.onClose();
this.connectionState = BaseConnection.State.CLOSED;
}
@Override @Override
public void onFillable() public void onFillable()
{ {
@ -252,6 +302,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onOpen() public void onOpen()
{ {
super.onOpen(); super.onOpen();
this.connectionState = BaseConnection.State.OPENED;
LOG.debug("fillInterested"); LOG.debug("fillInterested");
fillInterested(); fillInterested();
} }
@ -327,13 +378,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
catch (IOException e) catch (IOException e)
{ {
LOG.warn(e); LOG.warn(e);
terminateConnection(StatusCode.PROTOCOL,e.getMessage()); enqueClose(StatusCode.PROTOCOL,e.getMessage());
return -1; return -1;
} }
catch (CloseException e) catch (CloseException e)
{ {
LOG.warn(e); LOG.warn(e);
terminateConnection(e.getStatusCode(),e.getMessage()); enqueClose(e.getStatusCode(),e.getMessage());
return -1; return -1;
} }
} }
@ -341,7 +392,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override @Override
public void resume() public void resume()
{ {
if(suspendToken.getAndSet(false)) { if (suspendToken.getAndSet(false))
{
fillInterested(); fillInterested();
} }
} }
@ -379,24 +431,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return this; return this;
} }
/**
* For terminating connections forcefully.
*
* @param statusCode
* the WebSocket status code.
* @param reason
* the (optional) reason string. (null is allowed)
* @see StatusCode
*/
private void terminateConnection(int statusCode, String reason)
{
CloseInfo close = new CloseInfo(statusCode,reason);
FutureCallback<Void> nop = new FutureCallback<>();
ControlFrameBytes<Void> frameBytes = new ControlFrameBytes<Void>(this,nop,null,close.asFrame());
queue.append(frameBytes);
flush();
}
@Override @Override
public String toString() public String toString()
{ {
@ -411,6 +445,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{ {
LOG_FRAMES.debug("{} Writing {} frame bytes of {}",policy.getBehavior(),buffer.remaining(),frameBytes); LOG_FRAMES.debug("{} Writing {} frame bytes of {}",policy.getBehavior(),buffer.remaining(),frameBytes);
} }
if (connectionState == BaseConnection.State.CLOSED)
{
// connection is closed, STOP WRITING, geez.
return;
}
try try
{ {
endpoint.write(frameBytes.context,frameBytes,buffer); endpoint.write(frameBytes.context,frameBytes,buffer);

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@ -45,8 +46,17 @@ public class ControlFrameBytes<C> extends FrameBytes<C>
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
{ {
// Disconnect the connection (no more packets/frames) // is this outgoing close frame a response to a close?
connection.disconnect(false); if (connection.getState() == BaseConnection.State.CLOSING)
{
// Disconnect the connection (no more packets/frames)
connection.disconnect(false);
}
else
{
// Then this is the initiator for a close handshake.
connection.notifyClosing();
}
} }
else else
{ {

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.InterruptedByTimeoutException;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -51,7 +50,9 @@ public abstract class FrameBytes<C> implements Callback<C>, Runnable
{ {
Scheduler.Task task = this.task; Scheduler.Task task = this.task;
if (task != null) if (task != null)
{
task.cancel(); task.cancel();
}
} }
@Override @Override

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class WebSocketSession implements WebSocketConnection, IncomingFrames, OutgoingFrames public class WebSocketSession implements WebSocketConnection, IncomingFrames, OutgoingFrames
{ {
private static final Logger LOG = Log.getLogger(WebSocketSession.class); private static final Logger LOG = Log.getLogger(WebSocketSession.class);
/** /**
* The reference to the base connection. * The reference to the base connection.
* <p> * <p>
@ -69,6 +70,12 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
baseConnection.close(statusCode,reason); baseConnection.close(statusCode,reason);
} }
@Override
public void disconnect()
{
baseConnection.disconnect();
}
public IncomingFrames getIncoming() public IncomingFrames getIncoming()
{ {
return websocket; return websocket;
@ -91,6 +98,12 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
return baseConnection.getRemoteAddress(); return baseConnection.getRemoteAddress();
} }
@Override
public BaseConnection.State getState()
{
return baseConnection.getState();
}
@Override @Override
public String getSubProtocol() public String getSubProtocol()
{ {
@ -123,6 +136,12 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
return baseConnection.isReading(); return baseConnection.isReading();
} }
@Override
public void notifyClosing()
{
baseConnection.notifyClosing();
}
public void onConnect() public void onConnect()
{ {
LOG.debug("onConnect()"); LOG.debug("onConnect()");

View File

@ -56,6 +56,11 @@ public class LocalWebSocketConnection implements WebSocketConnection
{ {
} }
@Override
public void disconnect()
{
}
@Override @Override
public WebSocketPolicy getPolicy() public WebSocketPolicy getPolicy()
{ {
@ -68,6 +73,12 @@ public class LocalWebSocketConnection implements WebSocketConnection
return null; return null;
} }
@Override
public State getState()
{
return null;
}
@Override @Override
public String getSubProtocol() public String getSubProtocol()
{ {
@ -86,10 +97,14 @@ public class LocalWebSocketConnection implements WebSocketConnection
return false; return false;
} }
@Override
public void notifyClosing()
{
}
@Override @Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{ {
} }
@Override @Override

View File

@ -11,7 +11,7 @@ org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
# org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG # org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO # org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=DEBUG # org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.FrameBytes.LEVEL=DEBUG org.eclipse.jetty.websocket.io.FrameBytes.LEVEL=OFF
# org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG # org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG
# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG # org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG # org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG