More work around close handshake testing

This commit is contained in:
Joakim Erdfelt 2012-09-04 14:53:34 -07:00
parent 4fce4d0e53
commit 4f36513744
20 changed files with 1291 additions and 442 deletions

View File

@ -0,0 +1,133 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Tests for conditions due to bad networking.
*/
@Ignore("Not working yet")
public class BadNetworkTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(250);
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
public void testAbruptClientClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
// Validate that we are connected
future.get(500,TimeUnit.MILLISECONDS);
wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have client disconnect abruptly
WebSocketConnection conn = wsocket.getConnection();
Assert.assertThat("Connection",conn,instanceOf(AbstractWebSocketConnection.class));
AbstractWebSocketConnection awsc = (AbstractWebSocketConnection)conn;
awsc.disconnect(false);
// Client Socket should see close
wsocket.waitForClose(10,TimeUnit.SECONDS);
// Client Socket should see a close event, with status NO_CLOSE
// This event is automatically supplied by the underlying WebSocketClientConnection
// in the situation of a bad network connection.
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
@Test
public void testAbruptServerClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
// Validate that we are connected
future.get(500,TimeUnit.MILLISECONDS);
wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have server disconnect abruptly
ssocket.disconnect();
// Wait for close
wsocket.waitForClose(10,TimeUnit.SECONDS);
// Client Socket should see a close event, with status NO_CLOSE
// This event is automatically supplied by the underlying WebSocketClientConnection
// in the situation of a bad network connection.
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
}

View File

@ -0,0 +1,201 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.net.ConnectException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Various connect condition testing
*/
public class ClientConnectTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test(expected = UpgradeException.class)
public void testBadHandshake() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// no upgrade, just fail with a 404 error
connection.respond("HTTP/1.1 404 NOT FOUND\r\n\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
@Test(expected = UpgradeException.class)
public void testBadUpgrade() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// Upgrade badly
connection.respond("HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: rubbish\r\n" + "\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
@Test
public void testConnectionNotAccepted() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
// Intentionally not accept incoming socket.
// server.accept();
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Should have Timed Out");
}
catch (TimeoutException e)
{
// Expected Path
wsocket.assertNotOpened();
}
}
@Test(expected = ConnectException.class)
@Ignore("Need to get information about connection issue out of SelectManager somehow")
public void testConnectionRefused() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
// Intentionally bad port
URI wsUri = new URI("ws://127.0.0.1:1");
Future<UpgradeResponse> future = client.connect(wsUri);
// The attempt to get upgrade response future should throw error
try
{
future.get(1000,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> ConnectException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
@Test(expected = TimeoutException.class)
public void testConnectionTimeout() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
Assert.assertNotNull(ssocket);
// Intentionally don't upgrade
// ssocket.upgrade();
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> TimeoutException");
}
catch (ExecutionException e)
{
// Expected Path - throw underlying exception
FutureCallback.rethrow(e);
}
}
}

View File

@ -0,0 +1,115 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
public class ClientWriteThread extends Thread
{
private static final Logger LOG = Log.getLogger(ClientWriteThread.class);
private final WebSocketConnection conn;
private Exchanger<String> exchanger;
private int slowness = -1;
private int messageCount = 100;
private String message = "Hello";
public ClientWriteThread(WebSocketConnection conn)
{
this.conn = conn;
}
public Exchanger<String> getExchanger()
{
return exchanger;
}
public String getMessage()
{
return message;
}
public int getMessageCount()
{
return messageCount;
}
public int getSlowness()
{
return slowness;
}
@Override
public void run()
{
final AtomicInteger m = new AtomicInteger();
try
{
while (m.get() < messageCount)
{
conn.write(null,new FutureCallback<Void>(),message);
if (exchanger != null)
{
// synchronized on exchange
exchanger.exchange(message);
}
m.incrementAndGet();
if (slowness > 0)
{
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
}
catch (InterruptedException | IOException e)
{
LOG.warn(e);
}
}
public void setExchanger(Exchanger<String> exchanger)
{
this.exchanger = exchanger;
}
public void setMessage(String message)
{
this.message = message;
}
public void setMessageCount(int messageCount)
{
this.messageCount = messageCount;
}
public void setSlowness(int slowness)
{
this.slowness = slowness;
}
}

View File

@ -0,0 +1,140 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
public class ServerReadThread extends Thread
{
private static final Logger LOG = Log.getLogger(ServerReadThread.class);
private final ServerConnection conn;
private boolean active = true;
private int slowness = -1; // disabled is default
private AtomicInteger frameCount = new AtomicInteger();
private CountDownLatch expectedMessageCount;
public ServerReadThread(ServerConnection conn)
{
this.conn = conn;
this.expectedMessageCount = new CountDownLatch(1);
}
public void cancel()
{
active = false;
}
public int getFrameCount()
{
return frameCount.get();
}
public int getSlowness()
{
return slowness;
}
@Override
public void run()
{
ByteBufferPool bufferPool = conn.getBufferPool();
WebSocketPolicy policy = conn.getPolicy();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(buf);
int len = 0;
try
{
while (active)
{
BufferUtil.clearToFill(buf);
len = conn.read(buf);
if (len > 0)
{
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
conn.getParser().parse(buf);
}
LinkedList<WebSocketFrame> frames = conn.getIncomingFrames().getFrames();
WebSocketFrame frame;
while ((frame = frames.poll()) != null)
{
frameCount.incrementAndGet();
if (frame.getOpCode() == OpCode.CLOSE)
{
active = false;
// automatically response to close frame
CloseInfo close = new CloseInfo(frame);
conn.close(close.getStatusCode());
}
expectedMessageCount.countDown();
}
if (slowness > 0)
{
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
}
catch (IOException | InterruptedException e)
{
LOG.warn(e);
}
finally
{
bufferPool.release(buf);
}
}
public void setExpectedMessageCount(int expectedMessageCount)
{
this.expectedMessageCount = new CountDownLatch(expectedMessageCount);
}
public void setSlowness(int slowness)
{
this.slowness = slowness;
}
public void waitForExpectedMessageCount(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Expected Message Count attained",expectedMessageCount.await(timeoutDuration,timeoutUnit),is(true));
}
}

View File

@ -0,0 +1,115 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class ServerWriteThread extends Thread
{
private static final Logger LOG = Log.getLogger(ServerWriteThread.class);
private final ServerConnection conn;
private Exchanger<String> exchanger;
private int slowness = -1;
private int messageCount = 100;
private String message = "Hello";
public ServerWriteThread(ServerConnection conn)
{
this.conn = conn;
}
public Exchanger<String> getExchanger()
{
return exchanger;
}
public String getMessage()
{
return message;
}
public int getMessageCount()
{
return messageCount;
}
public int getSlowness()
{
return slowness;
}
@Override
public void run()
{
final AtomicInteger m = new AtomicInteger();
try
{
while (m.get() < messageCount)
{
conn.write(WebSocketFrame.text(message));
if (exchanger != null)
{
// synchronized on exchange
exchanger.exchange(message);
}
m.incrementAndGet();
if (slowness > 0)
{
TimeUnit.MILLISECONDS.sleep(slowness);
}
}
}
catch (InterruptedException | IOException e)
{
LOG.warn(e);
}
}
public void setExchanger(Exchanger<String> exchanger)
{
this.exchanger = exchanger;
}
public void setMessage(String message)
{
this.message = message;
}
public void setMessageCount(int messageCount)
{
this.messageCount = messageCount;
}
public void setSlowness(int slowness)
{
this.slowness = slowness;
}
}

View File

@ -0,0 +1,117 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SlowClientTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(60000);
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
@Slow
public void testClientSlowToSend() throws Exception
{
// final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
// tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Setup server read thread
ServerReadThread reader = new ServerReadThread(sconnection);
reader.setExpectedMessageCount(Integer.MAX_VALUE); // keep reading till I tell you to stop
reader.start();
// Have client write slowly.
int messageCount = 1000;
ClientWriteThread writer = new ClientWriteThread(tsocket.getConnection());
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
// writer.setExchanger(exchanger);
writer.setSlowness(50);
writer.start();
writer.join();
// Verify receive
Assert.assertThat("Frame Receive Count",reader.getFrameCount(),is(messageCount));
// Close
tsocket.getConnection().close(StatusCode.NORMAL,"Done");
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
reader.cancel(); // stop reading
}
}

View File

@ -0,0 +1,161 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SlowServerTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(60000);
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
@Slow
public void testServerSlowToRead() throws Exception
{
// final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
// tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
int messageCount = 10; // TODO: increase to 1000
// Setup slow server read thread
ServerReadThread reader = new ServerReadThread(sconnection);
reader.setExpectedMessageCount(messageCount);
reader.setSlowness(100); // slow it down
reader.start();
// Have client write as quickly as it can.
ClientWriteThread writer = new ClientWriteThread(tsocket.getConnection());
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
// writer.setExchanger(exchanger);
writer.setSlowness(-1); // disable slowness
writer.start();
writer.join();
// Verify receive
reader.waitForExpectedMessageCount(10,TimeUnit.SECONDS);
Assert.assertThat("Frame Receive Count",reader.getFrameCount(),is(messageCount));
// Close
tsocket.getConnection().close(StatusCode.NORMAL,"Done");
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
reader.cancel(); // stop reading
}
@Test
@Slow
public void testServerSlowToSend() throws Exception
{
// final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
// tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
// Confirm connected
future.get(500,TimeUnit.MILLISECONDS);
tsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Have server write slowly.
int messageCount = 1000;
ServerWriteThread writer = new ServerWriteThread(sconnection);
writer.setMessageCount(messageCount);
writer.setMessage("Hello");
// writer.setExchanger(exchanger);
writer.setSlowness(50);
writer.start();
writer.join();
// Verify receive
Assert.assertThat("Message Receive Count",tsocket.messageQueue.size(),is(messageCount));
// Close
sconnection.close(StatusCode.NORMAL);
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
}
}

View File

@ -0,0 +1,105 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* Various tests for Timeout handling
*/
@Ignore("Idle timeouts not working yet")
public class TimeoutTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here)
factory.start();
}
@Before
public void startServer() throws Exception
{
server = new BlockheadServer();
server.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
/**
* In a situation where the upgrade/connection is successfull, and there is no activity for a while, the idle timeout triggers on the client side and
* automatically initiates a close handshake.
*/
@Test
public void testIdleDetectedByClient() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
// Validate that connect occurred
future.get(500,TimeUnit.MILLISECONDS);
wsocket.waitForConnected(500,TimeUnit.MILLISECONDS);
// Wait for inactivity idle timeout.
long start = System.currentTimeMillis();
wsocket.waitForClose(10,TimeUnit.SECONDS);
long end = System.currentTimeMillis();
long dur = (end - start);
// Make sure idle timeout takes less than 5 total seconds
Assert.assertThat("Idle Timeout",dur,lessThanOrEqualTo(5000L));
// Client should see a close event, with status NO_CLOSE
wsocket.assertCloseCode(StatusCode.NORMAL);
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
@ -31,9 +33,6 @@ import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.junit.Assert;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
/**
* Testing Socket used on client side WebSocket testing.
*/
@ -57,7 +56,7 @@ public class TrackingSocket extends WebSocketAdapter
public void assertCloseCode(int expectedCode) throws InterruptedException
{
Assert.assertThat("Was Closed",closeLatch.await(500,TimeUnit.MILLISECONDS),is(true));
Assert.assertThat("Was Closed",closeLatch.await(50,TimeUnit.MILLISECONDS),is(true));
Assert.assertThat("Close Code",closeCode,is(expectedCode));
}
@ -163,7 +162,17 @@ public class TrackingSocket extends WebSocketAdapter
}
}
public void waitForMessage(TimeUnit timeoutUnit, int timeoutDuration) throws InterruptedException
public void waitForClose(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Client Socket Closed",closeLatch.await(timeoutDuration,timeoutUnit),is(true));
}
public void waitForConnected(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Client Socket Connected",openLatch.await(timeoutDuration,timeoutUnit),is(true));
}
public void waitForMessage(int timeoutDuration, TimeUnit timeoutUnit) throws InterruptedException
{
Assert.assertThat("Message Received",dataLatch.await(timeoutDuration,timeoutUnit),is(true));
}

View File

@ -18,20 +18,14 @@
package org.eclipse.jetty.websocket.client;
import java.net.ConnectException;
import static org.hamcrest.Matchers.*;
import java.net.URI;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
@ -40,14 +34,8 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@RunWith(AdvancedRunner.class)
public class WebSocketClientTest
@ -81,58 +69,6 @@ public class WebSocketClientTest
server.stop();
}
@Test(expected = UpgradeException.class)
public void testBadHandshake() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// no upgrade, just fail with a 404 error
connection.respond("HTTP/1.1 404 NOT FOUND\r\n\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test(expected = UpgradeException.class)
public void testBadUpgrade() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// Upgrade badly
connection.respond("HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: rubbish\r\n" + "\r\n");
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test
public void testBasicEcho_FromClient() throws Exception
{
@ -161,7 +97,7 @@ public class WebSocketClientTest
cliSock.getConnection().write(null,new FutureCallback<Void>(),"Hello World!");
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server
cliSock.waitForMessage(TimeUnit.MILLISECONDS,500);
cliSock.waitForMessage(500,TimeUnit.MILLISECONDS);
cliSock.assertMessage("Hello World!");
}
@ -193,263 +129,6 @@ public class WebSocketClientTest
wsocket.assertMessage("Hello World");
}
@Test
public void testBlockReceiving() throws Exception
{
final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
sconnection.upgrade();
future.get(500,TimeUnit.MILLISECONDS);
// define some messages to send server to client
byte[] send = new byte[]
{ (byte)0x81, (byte)0x05, (byte)'H', (byte)'e', (byte)'l', (byte)'l', (byte)'o' };
final int messages = 100000;
final AtomicInteger m = new AtomicInteger();
// Set up a consumer of received messages that waits a while before consuming
Thread consumer = new Thread()
{
@Override
public void run()
{
try
{
Thread.sleep(200);
while (m.get() < messages)
{
String msg = exchanger.exchange(null);
if ("Hello".equals(msg))
{
m.incrementAndGet();
}
else
{
throw new IllegalStateException("exchanged " + msg);
}
if ((m.get() % 1000) == 0)
{
// Artificially slow reader
Thread.sleep(10);
}
}
}
catch (InterruptedException e)
{
return;
}
catch (Exception e)
{
e.printStackTrace();
}
}
};
consumer.start();
long start = System.currentTimeMillis();
for (int i = 0; i < messages; i++)
{
sconnection.write(send,0,send.length);
sconnection.flush();
}
while (consumer.isAlive())
{
Thread.sleep(10);
}
// Duration of the read operation.
long readDur = (System.currentTimeMillis() - start);
Assert.assertThat("read duration",readDur,greaterThan(1000L)); // reading was blocked
Assert.assertEquals(m.get(),messages);
// Close with code
sconnection.close(StatusCode.NORMAL);
Assert.assertTrue("Client Socket Closed",tsocket.closeLatch.await(10,TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
}
@Test
@Ignore("Not working, it hangs")
public void testBlockSending() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(10000);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
final ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(250,TimeUnit.MILLISECONDS);
final int messages = 200000;
final AtomicLong totalB = new AtomicLong();
Thread consumer = new Thread()
{
@Override
public void run()
{
// Thread.sleep is for artificially poor performance reader needed for this testcase.
try
{
Thread.sleep(200);
byte[] recv = new byte[32 * 1024];
int len = 0;
while (len >= 0)
{
totalB.addAndGet(len);
len = ssocket.getInputStream().read(recv,0,recv.length);
Thread.sleep(10);
}
}
catch (InterruptedException e)
{
return;
}
catch (Exception e)
{
e.printStackTrace();
}
}
};
consumer.start();
// Send lots of messages client to server
long start = System.currentTimeMillis();
String mesg = "This is a test message to send";
for (int i = 0; i < messages; i++)
{
wsocket.getConnection().write(null,new FutureCallback<Void>(),mesg);
}
// Duration for the write phase
long writeDur = (System.currentTimeMillis() - start);
// wait for consumer to complete
while (totalB.get() < (messages * (mesg.length() + 6L)))
{
Thread.sleep(10);
}
Assert.assertThat("write duration",writeDur,greaterThan(1000L)); // writing was blocked
Assert.assertEquals(messages * (mesg.length() + 6L),totalB.get());
consumer.interrupt();
}
@Test
public void testConnectionNotAccepted() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
// Intentionally not accept incoming socket.
// server.accept();
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Should have Timed Out");
}
catch (TimeoutException e)
{
// Expected Path
wsocket.assertNotOpened();
}
}
@Test(expected = ConnectException.class)
@Ignore("Needs work in SelectManager to support this use case")
public void testConnectionRefused() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
// Intentionally bad port
URI wsUri = new URI("ws://127.0.0.1:1");
Future<UpgradeResponse> future = client.connect(wsUri);
// The attempt to get upgrade response future should throw error
try
{
future.get(1000,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> ConnectException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test(expected = TimeoutException.class)
public void testConnectionTimeout() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
Assert.assertNotNull(ssocket);
// Intentionally don't upgrade
// ssocket.upgrade();
// The attempt to get upgrade response future should throw error
try
{
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> TimeoutException");
}
catch (ExecutionException e)
{
FutureCallback.rethrow(e);
}
}
@Test
@Ignore("Work In Progress")
public void testIdle() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(500);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(250,TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
wsocket.closeLatch.await(10,TimeUnit.SECONDS);
Assert.assertTrue((System.currentTimeMillis() - start) < 5000);
wsocket.assertCloseCode(StatusCode.NORMAL);
}
@Test
public void testMessageBiggerThanBufferSize() throws Exception
{
@ -491,80 +170,4 @@ public class WebSocketClientTest
factSmall.stop();
}
}
@Test
@Ignore("Work In Progress")
public void testNotIdle() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(500);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(250,TimeUnit.MILLISECONDS);
wsocket.assertIsOpen();
// Send some messages from client to server
byte[] recv = new byte[1024];
int len = -1;
for (int i = 0; i < 10; i++)
{
Thread.sleep(250);
wsocket.getConnection().write(null,new FutureCallback<Void>(),"Hello");
len = ssocket.getInputStream().read(recv,0,recv.length);
Assert.assertTrue(len > 0);
}
// Send some messages from server to client
byte[] send = new byte[]
{ (byte)0x81, (byte)0x02, (byte)'H', (byte)'i' };
for (int i = 0; i < 10; i++)
{
Thread.sleep(250);
ssocket.write(send,0,send.length);
ssocket.flush();
Assert.assertEquals("Hi",wsocket.messageQueue.poll(1,TimeUnit.SECONDS));
}
// Close with code
long start = System.currentTimeMillis();
ssocket.write(new byte[]
{ (byte)0x88, (byte)0x02, (byte)4, (byte)87 },0,4);
ssocket.flush();
wsocket.closeLatch.await(10,TimeUnit.SECONDS);
long dur = (System.currentTimeMillis() - start);
Assert.assertThat("Overall duration",dur,lessThanOrEqualTo(5000L));
wsocket.assertClose(StatusCode.PROTOCOL,"Invalid close code 1111");
}
@Test
@Ignore("Test for is-open is broken")
public void testUpgradeThenTCPClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(500,TimeUnit.MILLISECONDS);
wsocket.assertIsOpen();
ssocket.disconnect();
Assert.assertThat("Close should have been detected",wsocket.closeLatch.await(10,TimeUnit.SECONDS),is(true));
}
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.client.blockhead;
import static org.hamcrest.Matchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -62,9 +64,6 @@ import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* A overly simplistic websocket server used during testing.
* <p>
@ -152,6 +151,16 @@ public class BlockheadServer
getOutputStream().flush();
}
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
public IncomingFramesCapture getIncomingFrames()
{
return incomingFrames;
}
public InputStream getInputStream() throws IOException
{
if (in == null)
@ -170,6 +179,16 @@ public class BlockheadServer
return out;
}
public Parser getParser()
{
return parser;
}
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override
public void incoming(WebSocketException e)
{

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
# org.eclipse.jetty.LEVEL=DEBUG
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG

View File

@ -25,6 +25,18 @@ import java.net.InetSocketAddress;
*/
public interface BaseConnection
{
public static enum State
{
/** Connection created, but not yet connected */
OPENING,
/** Connection created and connected */
OPENED,
/** Close handshake initiated, response pending. */
CLOSING,
/** Close handshake responded. */
CLOSED
}
/**
* Connection suspend token
*/
@ -37,7 +49,7 @@ public interface BaseConnection
}
/**
* Terminate connection, {@link StatusCode#NORMAL}, without a reason.
* Send a websocket Close frame, {@link StatusCode#NORMAL}, without a reason.
* <p>
* Basic usage: results in an non-blocking async write, then connection close.
*
@ -47,7 +59,7 @@ public interface BaseConnection
void close();
/**
* Terminate connection, with status code.
* Send a websocket Close frame, with status code.
* <p>
* Advanced usage: results in an non-blocking async write, then connection close.
*
@ -59,6 +71,11 @@ public interface BaseConnection
*/
void close(int statusCode, String reason);
/**
* Terminate the connection (no close frame sent)
*/
void disconnect();
/**
* Get the remote Address in use for this connection.
*
@ -66,6 +83,13 @@ public interface BaseConnection
*/
InetSocketAddress getRemoteAddress();
/**
* Get the state of the connection.
*
* @return the state of the connection.
*/
State getState();
/**
* Simple test to see if connection is open (and not closed)
*
@ -80,6 +104,11 @@ public interface BaseConnection
*/
boolean isReading();
/**
* Notify that the connection has entered the closing handshake
*/
void notifyClosing();
/**
* Suspend a the incoming read events on the connection.
*

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -169,7 +170,22 @@ public class WebSocketEventDriver implements IncomingFrames
{
events.onClose.call(websocket,session,close.getStatusCode(),close.getReason());
}
throw new CloseException(close.getStatusCode(),close.getReason());
// Is this close frame a response to a prior close?
if (session.getState() == BaseConnection.State.CLOSING)
{
// Then this is close response handshake (to a prior
// outgoing close frame)
session.disconnect();
}
else
{
// This is the initiator for a close handshake
// Trigger close response handshake.
session.notifyClosing();
session.close(close.getStatusCode(),close.getReason());
}
return;
}
case OpCode.PING:
{

View File

@ -68,6 +68,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private List<ExtensionConfig> extensions;
private boolean flushing;
private boolean isFilling;
private BaseConnection.State connectionState;
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
@ -80,18 +81,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.extensions = new ArrayList<>();
this.queue = new FrameQueue();
this.suspendToken = new AtomicBoolean(false);
this.connectionState = BaseConnection.State.OPENING;
}
@Override
public void close()
{
terminateConnection(StatusCode.NORMAL,null);
close(StatusCode.NORMAL,null);
}
@Override
public void close(int statusCode, String reason)
{
terminateConnection(statusCode,reason);
enqueClose(statusCode,reason);
}
public <C> void complete(FrameBytes<C> frameBytes)
@ -106,8 +108,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
public void disconnect()
{
disconnect(false);
}
public void disconnect(boolean onlyOutput)
{
connectionState = BaseConnection.State.CLOSED;
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
@ -120,6 +129,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
/**
* Enqueue a close frame.
*
* @param statusCode
* the WebSocket status code.
* @param reason
* the (optional) reason string. (null is allowed)
* @see StatusCode
*/
private void enqueClose(int statusCode, String reason)
{
CloseInfo close = new CloseInfo(statusCode,reason);
FutureCallback<Void> nop = new FutureCallback<>();
ControlFrameBytes<Void> frameBytes = new ControlFrameBytes<Void>(this,nop,null,close.asFrame());
queue.append(frameBytes);
flush();
}
public void flush()
{
FrameBytes<?> frameBytes = null;
@ -153,7 +180,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
}
}
write(buffer,frameBytes);
if (connectionState != BaseConnection.State.CLOSED)
{
write(buffer,frameBytes);
}
}
public ByteBufferPool getBufferPool()
@ -209,10 +240,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return session;
}
@Override
public State getState()
{
return connectionState;
}
@Override
public boolean isOpen()
{
return getEndPoint().isOpen();
return (getState() != BaseConnection.State.CLOSED) && getEndPoint().isOpen();
}
@Override
@ -221,6 +258,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return isFilling;
}
@Override
public void notifyClosing()
{
this.connectionState = BaseConnection.State.CLOSING;
}
@Override
public void onClose()
{
super.onClose();
this.connectionState = BaseConnection.State.CLOSED;
}
@Override
public void onFillable()
{
@ -252,6 +302,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onOpen()
{
super.onOpen();
this.connectionState = BaseConnection.State.OPENED;
LOG.debug("fillInterested");
fillInterested();
}
@ -327,13 +378,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
catch (IOException e)
{
LOG.warn(e);
terminateConnection(StatusCode.PROTOCOL,e.getMessage());
enqueClose(StatusCode.PROTOCOL,e.getMessage());
return -1;
}
catch (CloseException e)
{
LOG.warn(e);
terminateConnection(e.getStatusCode(),e.getMessage());
enqueClose(e.getStatusCode(),e.getMessage());
return -1;
}
}
@ -341,7 +392,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void resume()
{
if(suspendToken.getAndSet(false)) {
if (suspendToken.getAndSet(false))
{
fillInterested();
}
}
@ -379,24 +431,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return this;
}
/**
* For terminating connections forcefully.
*
* @param statusCode
* the WebSocket status code.
* @param reason
* the (optional) reason string. (null is allowed)
* @see StatusCode
*/
private void terminateConnection(int statusCode, String reason)
{
CloseInfo close = new CloseInfo(statusCode,reason);
FutureCallback<Void> nop = new FutureCallback<>();
ControlFrameBytes<Void> frameBytes = new ControlFrameBytes<Void>(this,nop,null,close.asFrame());
queue.append(frameBytes);
flush();
}
@Override
public String toString()
{
@ -411,6 +445,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG_FRAMES.debug("{} Writing {} frame bytes of {}",policy.getBehavior(),buffer.remaining(),frameBytes);
}
if (connectionState == BaseConnection.State.CLOSED)
{
// connection is closed, STOP WRITING, geez.
return;
}
try
{
endpoint.write(frameBytes.context,frameBytes,buffer);

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BaseConnection;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@ -45,8 +46,17 @@ public class ControlFrameBytes<C> extends FrameBytes<C>
if (frame.getOpCode() == OpCode.CLOSE)
{
// Disconnect the connection (no more packets/frames)
connection.disconnect(false);
// is this outgoing close frame a response to a close?
if (connection.getState() == BaseConnection.State.CLOSING)
{
// Disconnect the connection (no more packets/frames)
connection.disconnect(false);
}
else
{
// Then this is the initiator for a close handshake.
connection.notifyClosing();
}
}
else
{

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.io;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
@ -51,7 +50,9 @@ public abstract class FrameBytes<C> implements Callback<C>, Runnable
{
Scheduler.Task task = this.task;
if (task != null)
{
task.cancel();
}
}
@Override

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public class WebSocketSession implements WebSocketConnection, IncomingFrames, OutgoingFrames
{
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
/**
* The reference to the base connection.
* <p>
@ -69,6 +70,12 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
baseConnection.close(statusCode,reason);
}
@Override
public void disconnect()
{
baseConnection.disconnect();
}
public IncomingFrames getIncoming()
{
return websocket;
@ -91,6 +98,12 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
return baseConnection.getRemoteAddress();
}
@Override
public BaseConnection.State getState()
{
return baseConnection.getState();
}
@Override
public String getSubProtocol()
{
@ -123,6 +136,12 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
return baseConnection.isReading();
}
@Override
public void notifyClosing()
{
baseConnection.notifyClosing();
}
public void onConnect()
{
LOG.debug("onConnect()");

View File

@ -56,6 +56,11 @@ public class LocalWebSocketConnection implements WebSocketConnection
{
}
@Override
public void disconnect()
{
}
@Override
public WebSocketPolicy getPolicy()
{
@ -68,6 +73,12 @@ public class LocalWebSocketConnection implements WebSocketConnection
return null;
}
@Override
public State getState()
{
return null;
}
@Override
public String getSubProtocol()
{
@ -86,10 +97,14 @@ public class LocalWebSocketConnection implements WebSocketConnection
return false;
}
@Override
public void notifyClosing()
{
}
@Override
public <C> void ping(C context, Callback<C> callback, byte[] payload) throws IOException
{
}
@Override

View File

@ -11,7 +11,7 @@ org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
# org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.FrameBytes.LEVEL=DEBUG
org.eclipse.jetty.websocket.io.FrameBytes.LEVEL=OFF
# org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG
# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG