Issue #207 - ongoing testing improvements

This commit is contained in:
Joakim Erdfelt 2017-04-07 16:24:12 -07:00
parent 2d01d1431a
commit 9620d5c5e8
13 changed files with 61 additions and 55 deletions

View File

@ -33,7 +33,6 @@ import javax.websocket.MessageHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
@ -128,7 +127,7 @@ public class JsrSessionTest
session.open();
FrameCallback callback = new FrameCallbackAdapter();
FrameCallback callback = new FrameCallback.Adapter();
session.incomingFrame(new TextFrame().setPayload("G'day").setFin(true), callback);
session.incomingFrame(new TextFrame().setPayload("Hello World").setFin(true), callback);
@ -159,7 +158,7 @@ public class JsrSessionTest
session.open();
FrameCallback callback = new FrameCallbackAdapter();
FrameCallback callback = new FrameCallback.Adapter();
session.incomingFrame(new BinaryFrame().setPayload("G'day").setFin(false), callback);
session.incomingFrame(new ContinuationFrame().setPayload(" World").setFin(true), callback);

View File

@ -566,6 +566,7 @@ public class Parser
int bytesAvailable = buffer.remaining();
int windowBytes = Math.min(bytesAvailable, bytesExpected);
int limit = buffer.limit();
assert(buffer.position() + windowBytes < buffer.capacity());
buffer.limit(buffer.position() + windowBytes);
ByteBuffer window = buffer.slice();
buffer.limit(limit);

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.websocket.common;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
@ -30,6 +32,7 @@ import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -99,6 +102,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
private CompletableFuture<Session> openFuture;
private AtomicReference<Throwable> pendingError = new AtomicReference<>();
public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, Object endpoint, LogicalConnection connection)
{
@ -181,6 +185,14 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
super.doStart();
connection.setMaxIdleTimeout(this.policy.getIdleTimeout());
Throwable fastFail;
synchronized (pendingError)
{
fastFail = pendingError.get();
}
if(fastFail != null)
onError(fastFail);
}
@Override
@ -519,6 +531,16 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override
public void onError(Throwable t)
{
synchronized (pendingError)
{
if (!endpointFunctions.isStarted())
{
// this is a *really* fast fail, before the Session has even started.
pendingError.compareAndSet(null, t);
return;
}
}
Throwable cause = getInvokedCause(t);
if (openFuture != null && !openFuture.isDone())
@ -531,10 +553,18 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{
close(StatusCode.BAD_PAYLOAD, cause.getMessage());
}
else if (cause instanceof SocketTimeoutException)
{
close(StatusCode.SHUTDOWN, cause.getMessage());
}
else if (cause instanceof IOException)
{
close(StatusCode.PROTOCOL, cause.getMessage());
}
else if (cause instanceof SocketException)
{
close(StatusCode.SHUTDOWN, cause.getMessage());
}
else if (cause instanceof CloseException)
{
CloseException ce = (CloseException) cause;

View File

@ -594,15 +594,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return true;
}
try
{
notifyError(new SocketTimeoutException("Timeout on Read"));
}
finally
{
// This is an Abnormal Close condition
close(StatusCode.SHUTDOWN,"Idle Timeout");
}
return false;
}

View File

@ -1,25 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.websocket.api.FrameCallback;
public class FrameCallbackAdapter extends FrameCallback.Adapter
{
}

View File

@ -26,12 +26,12 @@ import java.util.Collections;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
@ -62,7 +62,7 @@ public class ExtensionTool
this.capture = new IncomingFramesCapture();
this.parser = new UnitParser(policy, frame ->
{
ext.incomingFrame(frame, new FrameCallbackAdapter());
ext.incomingFrame(frame, new FrameCallback.Adapter());
return true;
});
}

View File

@ -29,10 +29,10 @@ import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension;
@ -78,7 +78,7 @@ public class FragmentExtensionTest
for (String q : quote)
{
Frame frame = new TextFrame().setPayload(q);
ext.incomingFrame(frame, new FrameCallbackAdapter());
ext.incomingFrame(frame, new FrameCallback.Adapter());
}
int len = quote.size();
@ -122,7 +122,7 @@ public class FragmentExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.incomingFrame(ping, new FrameCallbackAdapter());
ext.incomingFrame(ping, new FrameCallback.Adapter());
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);

View File

@ -26,9 +26,9 @@ import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension;
@ -53,7 +53,7 @@ public class IdentityExtensionTest
ext.setNextIncomingFrames(capture);
Frame frame = new TextFrame().setPayload("hello");
ext.incomingFrame(frame, new FrameCallbackAdapter());
ext.incomingFrame(frame, new FrameCallback.Adapter());
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.TEXT, 1);

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.websocket.common.extensions.compress;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -30,10 +30,10 @@ import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtensionTest;
@ -246,7 +246,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
String payload = "Are you there?";
Frame ping = new PingFrame().setPayload(payload);
ext.incomingFrame(ping, new FrameCallbackAdapter());
ext.incomingFrame(ping, new FrameCallback.Adapter());
capture.assertFrameCount(1);
capture.assertHasFrame(OpCode.PING, 1);
@ -292,7 +292,7 @@ public class PerMessageDeflateExtensionTest extends AbstractExtensionTest
{
TextFrame frame = new TextFrame().setPayload(q);
frame.setRsv1(false); // indication to extension that frame is not compressed (ie: a normal frame)
ext.incomingFrame(frame, new FrameCallbackAdapter());
ext.incomingFrame(frame, new FrameCallback.Adapter());
}
int len = quote.size();

View File

@ -619,7 +619,7 @@ public class BlockheadClient implements OutgoingFrames, ConnectionStateListener,
@Override
public boolean onFrame(Frame frame)
{
// TODO: do something with frame?
extensionStack.incomingFrame(frame, new FrameCallback.Adapter());
return true;
}

View File

@ -56,7 +56,6 @@ import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.FrameCallbackAdapter;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
@ -103,7 +102,7 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
this.bufferPool = new MappedByteBufferPool(BUFFER_SIZE);
this.parser = new Parser(policy,bufferPool, frame ->
{
extensionStack.incomingFrame(frame, new FrameCallbackAdapter());
extensionStack.incomingFrame(frame, new FrameCallback.Adapter());
return true;
});
this.parseCount = new AtomicInteger(0);
@ -458,8 +457,11 @@ public class BlockheadServerConnection implements IncomingFrames, OutgoingFrames
readBytes += len;
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
while(buf.hasRemaining())
{
parser.parse(buf);
}
}
try
{

View File

@ -51,7 +51,7 @@ public class IdentityExtensionTest
server.stop();
}
@Test
@Test(timeout = 10000)
public void testIdentityExtension() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@ -60,7 +61,7 @@ public class RFCSocket
public void onText(String message) throws IOException
{
LOG.debug("onText({})",message);
// Test the RFC 6455 close code 1011 that should close
// Test the RFC 6455 close code 1011 that should close.
// trigger a WebSocket server terminated close.
if (message.equals("CRASH"))
{
@ -73,4 +74,10 @@ public class RFCSocket
if (remote.getBatchMode() == BatchMode.ON)
remote.flush();
}
@OnWebSocketError
public void onError(Throwable cause)
{
LOG.warn("onError()", cause);
}
}