Issue #2282 - more websocket test stablization

Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
Joakim Erdfelt 2018-03-07 17:41:45 -06:00
parent abf1262848
commit e76f6d7baf
11 changed files with 51 additions and 34 deletions

View File

@ -186,7 +186,7 @@ public class ClientConnectTest
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
// actual value for this test is irrelevant, its important that this // actual value for this test is irrelevant, its important that this
// header actually be sent with a value (the value specified) // header actually be sent with a value (the value specified)
upgradeRequest.setHeader("Authorization", "Bogus SHA1"); upgradeRequest.setHeader("Authorization", "Basic YWxhZGRpbjpvcGVuc2VzYW1l");
Future<Session> future = client.connect(wsocket,wsUri,upgradeRequest); Future<Session> future = client.connect(wsocket,wsUri,upgradeRequest);
try (BlockheadConnection serverConn = serverConnFut.get(Timeouts.CONNECT, Timeouts.CONNECT_UNIT)) try (BlockheadConnection serverConn = serverConnFut.get(Timeouts.CONNECT, Timeouts.CONNECT_UNIT))
@ -194,13 +194,14 @@ public class ClientConnectTest
HttpFields upgradeRequestHeaders = serverConn.getUpgradeRequestHeaders(); HttpFields upgradeRequestHeaders = serverConn.getUpgradeRequestHeaders();
Session sess = future.get(30, TimeUnit.SECONDS); Session sess = future.get(30, TimeUnit.SECONDS);
sess.close();
HttpField authHeader = upgradeRequestHeaders.getField(HttpHeader.AUTHORIZATION); HttpField authHeader = upgradeRequestHeaders.getField(HttpHeader.AUTHORIZATION);
assertThat("Server Request Authorization Header", authHeader, is(notNullValue())); assertThat("Server Request Authorization Header", authHeader, is(notNullValue()));
assertThat("Server Request Authorization Value", authHeader.getValue(), is("Authorization: Bogus SHA1")); assertThat("Server Request Authorization Value", authHeader.getValue(), is("Basic YWxhZGRpbjpvcGVuc2VzYW1l"));
assertThat("Connect.UpgradeRequest", wsocket.connectUpgradeRequest, notNullValue()); assertThat("Connect.UpgradeRequest", wsocket.connectUpgradeRequest, notNullValue());
assertThat("Connect.UpgradeResponse", wsocket.connectUpgradeResponse, notNullValue()); assertThat("Connect.UpgradeResponse", wsocket.connectUpgradeResponse, notNullValue());
sess.close();
} }
} }

View File

@ -28,7 +28,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.masks.ZeroMasker; import org.eclipse.jetty.websocket.client.masks.ZeroMasker;
import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.WebSocketFrame;
@ -120,14 +119,8 @@ public class SlowServerTest
String prefix = "Server Frame[" + i + "]"; String prefix = "Server Frame[" + i + "]";
Assert.assertThat(prefix, serverFrame, is(notNullValue())); Assert.assertThat(prefix, serverFrame, is(notNullValue()));
Assert.assertThat(prefix + ".opCode", serverFrame.getOpCode(), is(OpCode.TEXT)); Assert.assertThat(prefix + ".opCode", serverFrame.getOpCode(), is(OpCode.TEXT));
Assert.assertThat(prefix + ".payload", serverFrame.getPayloadAsUTF8(), is("Hello")); Assert.assertThat(prefix + ".payload", serverFrame.getPayloadAsUTF8(), is("Hello/" + i + "/"));
} }
// Close
tsocket.getSession().close(StatusCode.NORMAL, "Done");
Assert.assertTrue("Client Socket Closed", tsocket.closeLatch.await(10, TimeUnit.SECONDS));
tsocket.assertCloseCode(StatusCode.NORMAL);
} }
} }
@ -161,11 +154,6 @@ public class SlowServerTest
// Verify receive // Verify receive
Assert.assertThat("Message Receive Count", clientSocket.messageQueue.size(), is(messageCount)); Assert.assertThat("Message Receive Count", clientSocket.messageQueue.size(), is(messageCount));
}
// Close server connection (by exiting try-with-resources)
}
Assert.assertTrue("Client Socket Closed", clientSocket.closeLatch.await(10, TimeUnit.SECONDS));
clientSocket.assertCloseCode(StatusCode.NORMAL);
} }
} }

View File

@ -95,7 +95,7 @@ public class TomcatServerQuirksTest
server.setRequestHandling((req, resp) -> { server.setRequestHandling((req, resp) -> {
// Add the extra problematic header that triggers bug found in jetty-io // Add the extra problematic header that triggers bug found in jetty-io
resp.setHeader("Transfer-Encoding", "chunked"); resp.setHeader("Transfer-Encoding", "chunked");
return true; return false;
}); });
try try

View File

@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
@ -70,6 +71,8 @@ public class WebSocketClientTest
public static void startServer() throws Exception public static void startServer() throws Exception
{ {
server = new BlockheadServer(); server = new BlockheadServer();
server.getPolicy().setMaxTextMessageSize(200 * 1024);
server.getPolicy().setMaxBinaryMessageSize(200 * 1024);
server.start(); server.start();
} }
@ -80,7 +83,7 @@ public class WebSocketClientTest
} }
@AfterClass @AfterClass
public void stopServer() throws Exception public static void stopServer() throws Exception
{ {
server.stop(); server.stop();
} }
@ -122,6 +125,7 @@ public class WebSocketClientTest
// Setup echo of frames on server side // Setup echo of frames on server side
serverConn.setIncomingFrameConsumer((frame)->{ serverConn.setIncomingFrameConsumer((frame)->{
WebSocketFrame copy = WebSocketFrame.copy(frame); WebSocketFrame copy = WebSocketFrame.copy(frame);
copy.setMask(null); // strip client mask (if present)
serverConn.write(copy); serverConn.write(copy);
}); });
@ -237,7 +241,8 @@ public class WebSocketClientTest
assertThat("Local Socket Address / Host",local.getAddress().getHostAddress(),notNullValue()); assertThat("Local Socket Address / Host",local.getAddress().getHostAddress(),notNullValue());
assertThat("Local Socket Address / Port",local.getPort(),greaterThan(0)); assertThat("Local Socket Address / Port",local.getPort(),greaterThan(0));
assertThat("Remote Socket Address / Host",remote.getAddress().getHostAddress(),is(wsUri.getHost())); String uriHostAddress = InetAddress.getByName(wsUri.getHost()).getHostAddress();
assertThat("Remote Socket Address / Host",remote.getAddress().getHostAddress(),is(uriHostAddress));
assertThat("Remote Socket Address / Port",remote.getPort(),greaterThan(0)); assertThat("Remote Socket Address / Port",remote.getPort(),greaterThan(0));
} }
@ -264,6 +269,7 @@ public class WebSocketClientTest
// Setup echo of frames on server side // Setup echo of frames on server side
serverConn.setIncomingFrameConsumer((frame)->{ serverConn.setIncomingFrameConsumer((frame)->{
WebSocketFrame copy = WebSocketFrame.copy(frame); WebSocketFrame copy = WebSocketFrame.copy(frame);
copy.setMask(null); // strip client mask (if present)
serverConn.write(copy); serverConn.write(copy);
}); });

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.test; package org.eclipse.jetty.websocket.common.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -29,9 +30,14 @@ import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
public class BlockheadClientConnection extends BlockheadConnection public class BlockheadClientConnection extends BlockheadConnection
{ {
public BlockheadClientConnection(WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack, EndPoint endp, Executor executor) public BlockheadClientConnection(WebSocketPolicy policy,
ByteBufferPool bufferPool,
ExtensionStack extensionStack,
CompletableFuture<BlockheadConnection> openFut,
EndPoint endp,
Executor executor)
{ {
super(policy, bufferPool, extensionStack, endp, executor); super(policy, bufferPool, extensionStack, openFut, endp, executor);
} }
@Override @Override

View File

@ -222,6 +222,7 @@ public class BlockheadClientRequest extends HttpRequest implements Response.Comp
client.getPolicy(), client.getPolicy(),
client.getBufferPool(), client.getBufferPool(),
extensionStack, extensionStack,
fut,
endp, endp,
client.getExecutor()); client.getExecutor());
@ -232,8 +233,6 @@ public class BlockheadClientRequest extends HttpRequest implements Response.Comp
// Now swap out the connection // Now swap out the connection
endp.upgrade(connection); endp.upgrade(connection);
fut.complete(connection);
} }
/** /**

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common.test;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -57,11 +58,12 @@ public class BlockheadConnection extends AbstractConnection implements Connectio
private final ExtensionStack extensionStack; private final ExtensionStack extensionStack;
private final OutgoingNetwork networkOutgoing; private final OutgoingNetwork networkOutgoing;
private final IncomingCapture incomingCapture; private final IncomingCapture incomingCapture;
private final CompletableFuture<BlockheadConnection> openFuture;
private ByteBuffer networkBuffer; private ByteBuffer networkBuffer;
private HttpFields upgradeResponseHeaders; private HttpFields upgradeResponseHeaders;
private HttpFields upgradeRequestHeaders; private HttpFields upgradeRequestHeaders;
public BlockheadConnection(WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack, EndPoint endp, Executor executor) public BlockheadConnection(WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack, CompletableFuture<BlockheadConnection> openFut, EndPoint endp, Executor executor)
{ {
super(endp, executor); super(endp, executor);
this.LOG = Log.getLogger(this.getClass()); this.LOG = Log.getLogger(this.getClass());
@ -70,6 +72,7 @@ public class BlockheadConnection extends AbstractConnection implements Connectio
this.parser = new Parser(policy, bufferPool); this.parser = new Parser(policy, bufferPool);
this.generator = new Generator(policy, bufferPool, false); this.generator = new Generator(policy, bufferPool, false);
this.extensionStack = extensionStack; this.extensionStack = extensionStack;
this.openFuture = openFut;
this.extensionStack.configure(this.parser); this.extensionStack.configure(this.parser);
this.extensionStack.configure(this.generator); this.extensionStack.configure(this.generator);
@ -179,22 +182,26 @@ public class BlockheadConnection extends AbstractConnection implements Connectio
public void onOpen() public void onOpen()
{ {
super.onOpen(); super.onOpen();
if(this.openFuture != null)
this.openFuture.complete(this);
fillInterested(); fillInterested();
} }
public void processConnectionError(Throwable cause) public void processConnectionError(Throwable cause)
{ {
LOG.warn("Connection Error", cause); LOG.warn("Connection Error", cause);
if(this.openFuture != null)
this.openFuture.completeExceptionally(cause);
} }
public void setUpgradeRequestHeaders(HttpFields upgradeRequestHeaders) public void setUpgradeRequestHeaders(HttpFields upgradeRequestHeaders)
{ {
this.upgradeRequestHeaders = upgradeRequestHeaders; this.upgradeRequestHeaders = new HttpFields(upgradeRequestHeaders);
} }
public void setUpgradeResponseHeaders(HttpFields upgradeResponseHeaders) public void setUpgradeResponseHeaders(HttpFields upgradeResponseHeaders)
{ {
this.upgradeResponseHeaders = upgradeResponseHeaders; this.upgradeResponseHeaders = new HttpFields(upgradeResponseHeaders);
} }
public void setIncomingFrameConsumer(Consumer<Frame> consumer) public void setIncomingFrameConsumer(Consumer<Frame> consumer)
@ -209,7 +216,11 @@ public class BlockheadConnection extends AbstractConnection implements Connectio
public void writeRaw(ByteBuffer buf) throws IOException public void writeRaw(ByteBuffer buf) throws IOException
{ {
getEndPoint().flush(buf); boolean done = false;
while (!done)
{
done = getEndPoint().flush(buf);
}
} }
public void writeRaw(ByteBuffer buf, int numBytes) throws IOException public void writeRaw(ByteBuffer buf, int numBytes) throws IOException

View File

@ -241,6 +241,7 @@ public class BlockheadServer
policy, policy,
bufferPool, bufferPool,
extensionStack, extensionStack,
connFut,
endp, endp,
executor); executor);
@ -258,10 +259,6 @@ public class BlockheadServer
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Websocket upgrade {} {}", request.getRequestURI(), wsConnection); LOG.debug("Websocket upgrade {} {}", request.getRequestURI(), wsConnection);
if(connFut != null)
connFut.complete(wsConnection);
} }
catch(Throwable cause) catch(Throwable cause)
{ {

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.test; package org.eclipse.jetty.websocket.common.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -27,8 +28,13 @@ import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
public class BlockheadServerConnection extends BlockheadConnection public class BlockheadServerConnection extends BlockheadConnection
{ {
public BlockheadServerConnection(WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack, EndPoint endp, Executor executor) public BlockheadServerConnection(WebSocketPolicy policy,
ByteBufferPool bufferPool,
ExtensionStack extensionStack,
CompletableFuture<BlockheadConnection> openFut,
EndPoint endp,
Executor executor)
{ {
super(policy, bufferPool, extensionStack, endp, executor); super(policy, bufferPool, extensionStack, openFut, endp, executor);
} }
} }

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.common.test;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException; import java.net.SocketException;
@ -184,6 +185,7 @@ public class Fuzzer implements AutoCloseable
LOG.debug("{} {}",prefix,actual); LOG.debug("{} {}",prefix,actual);
Assert.assertThat(prefix, actual, is(notNullValue()));
Assert.assertThat(prefix + ".opcode",OpCode.name(actual.getOpCode()),is(OpCode.name(expected.getOpCode()))); Assert.assertThat(prefix + ".opcode",OpCode.name(actual.getOpCode()),is(OpCode.name(expected.getOpCode())));
prefix += "/" + actual.getOpCode(); prefix += "/" + actual.getOpCode();
if (expected.getOpCode() == OpCode.CLOSE) if (expected.getOpCode() == OpCode.CLOSE)

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.DataFrame; import org.eclipse.jetty.websocket.common.frames.DataFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.Fuzzer; import org.eclipse.jetty.websocket.common.test.Fuzzer;
import org.eclipse.jetty.websocket.common.test.Timeouts;
import org.junit.Test; import org.junit.Test;
/** /**
@ -176,7 +177,7 @@ public class TestABCase9 extends AbstractABCase
fuzzer.connect(); fuzzer.connect();
fuzzer.setSendMode(Fuzzer.SendMode.BULK); fuzzer.setSendMode(Fuzzer.SendMode.BULK);
fuzzer.send(send); fuzzer.send(send);
fuzzer.expect(expect); fuzzer.expect(expect, Timeouts.POLL_EVENT*2, Timeouts.POLL_EVENT_UNIT);
} }
} }