Issue #207 - Improved Close notification and testing
This commit is contained in:
parent
30595ccdaa
commit
0e2656b938
|
@ -85,7 +85,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
private final LogicalConnection connection;
|
||||
private final Executor executor;
|
||||
private final AtomicConnectionState connectionState = new AtomicConnectionState();
|
||||
private final AtomicBoolean closeSent = new AtomicBoolean();
|
||||
private final AtomicBoolean closeSent = new AtomicBoolean(false);
|
||||
private final AtomicBoolean closeNotified = new AtomicBoolean(false);
|
||||
|
||||
// The websocket endpoint object itself
|
||||
private final Object endpoint;
|
||||
|
@ -338,6 +339,16 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
return this.containerScope;
|
||||
}
|
||||
|
||||
public Object getEndpoint()
|
||||
{
|
||||
Object ret = endpoint;
|
||||
while (ret instanceof ManagedEndpoint)
|
||||
{
|
||||
ret = ((ManagedEndpoint) ret).getRawEndpoint();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return executor;
|
||||
|
@ -458,6 +469,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
{
|
||||
try (ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("incomingFrame({}, {}) - this.state={}, connectionState={}, endpointFunctions={}",
|
||||
frame, callback, getState(), connectionState.get(), endpointFunctions);
|
||||
}
|
||||
if (connectionState.get() != AtomicConnectionState.State.CLOSED)
|
||||
{
|
||||
// For endpoints that want to see raw frames.
|
||||
|
@ -469,19 +485,23 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
{
|
||||
case OpCode.CLOSE:
|
||||
{
|
||||
CloseInfo closeInfo = null;
|
||||
|
||||
if (connectionState.onClosing())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ConnectionState: Transition to CLOSING");
|
||||
CloseFrame closeframe = (CloseFrame) frame;
|
||||
closeInfo = new CloseInfo(closeframe, true);
|
||||
CloseInfo closeInfo = new CloseInfo(closeframe, true);
|
||||
notifyClose(closeInfo);
|
||||
close(closeInfo, onDisconnectCallback);
|
||||
}
|
||||
else if (connectionState.onClosed())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ConnectionState: Transition to CLOSED");
|
||||
CloseFrame closeframe = (CloseFrame) frame;
|
||||
CloseInfo closeInfo = new CloseInfo(closeframe, true);
|
||||
notifyClose(closeInfo);
|
||||
connection.disconnect();
|
||||
}
|
||||
else
|
||||
|
@ -490,12 +510,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
LOG.debug("ConnectionState: {} - Close Frame Received", connectionState);
|
||||
}
|
||||
|
||||
if (closeInfo != null)
|
||||
{
|
||||
notifyClose(closeInfo.getStatusCode(), closeInfo.getReason());
|
||||
close(closeInfo, onDisconnectCallback);
|
||||
}
|
||||
|
||||
// let fill/parse continue
|
||||
callback.succeed();
|
||||
|
||||
|
@ -601,16 +615,19 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
return "wss".equalsIgnoreCase(requestURI.getScheme());
|
||||
}
|
||||
|
||||
public void notifyClose(int statusCode, String reason)
|
||||
public void notifyClose(CloseInfo closeInfo)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("notifyClose({},{}) [{}]", statusCode, reason, getState());
|
||||
LOG.debug("notifyClose({}) closeNotified={} [{}]", closeInfo, closeNotified.get(), getState());
|
||||
}
|
||||
|
||||
CloseInfo closeInfo = new CloseInfo(statusCode, reason);
|
||||
// only notify once
|
||||
if (closeNotified.compareAndSet(false, true))
|
||||
{
|
||||
endpointFunctions.onClose(closeInfo);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Error Event.
|
||||
|
|
|
@ -25,19 +25,21 @@ import java.util.concurrent.TimeUnit;
|
|||
import javax.servlet.ServletContextEvent;
|
||||
import javax.servlet.ServletContextListener;
|
||||
import javax.websocket.DeploymentException;
|
||||
import javax.websocket.Endpoint;
|
||||
import javax.websocket.EndpointConfig;
|
||||
import javax.websocket.MessageHandler;
|
||||
import javax.websocket.server.ServerEndpointConfig;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.TestingDir;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.webapp.WebAppContext;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.jsr356.server.ServerContainer;
|
||||
import org.eclipse.jetty.websocket.tests.LeakTrackingBufferPoolRule;
|
||||
import org.eclipse.jetty.websocket.tests.TrackingEndpoint;
|
||||
import org.eclipse.jetty.websocket.tests.WSServer;
|
||||
import org.eclipse.jetty.websocket.tests.client.jsr356.JsrClientTrackingSocket;
|
||||
import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingEndpoint;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -46,15 +48,16 @@ import org.junit.Test;
|
|||
* Example of an {@link javax.websocket.Endpoint} extended echo server added programmatically via the
|
||||
* {@link ServerContainer#addEndpoint(javax.websocket.server.ServerEndpointConfig)}
|
||||
*/
|
||||
public class BasicEndpointTest
|
||||
public class EndpointViaConfigTest
|
||||
{
|
||||
public static class BasicEchoEndpoint extends Endpoint implements MessageHandler.Whole<String>
|
||||
{
|
||||
private javax.websocket.Session session;
|
||||
private static final Logger LOG = Log.getLogger(EndpointViaConfigTest.class);
|
||||
|
||||
public static class BasicEchoEndpoint extends AbstractJsrTrackingEndpoint implements MessageHandler.Whole<String>
|
||||
{
|
||||
@Override
|
||||
public void onMessage(String msg)
|
||||
{
|
||||
super.onWsText(msg);
|
||||
// reply with echo
|
||||
session.getAsyncRemote().sendText(msg);
|
||||
}
|
||||
|
@ -62,7 +65,7 @@ public class BasicEndpointTest
|
|||
@Override
|
||||
public void onOpen(javax.websocket.Session session, EndpointConfig config)
|
||||
{
|
||||
this.session = session;
|
||||
super.onOpen(session, config);
|
||||
this.session.addMessageHandler(this);
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +127,7 @@ public class BasicEndpointTest
|
|||
try
|
||||
{
|
||||
client.start();
|
||||
JsrClientTrackingSocket clientSocket = new JsrClientTrackingSocket();
|
||||
TrackingEndpoint clientSocket = new TrackingEndpoint("Client");
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket,uri.resolve("/app/echo"));
|
||||
// wait for connect
|
||||
Session clientSession = clientConnectFuture.get(5,TimeUnit.SECONDS);
|
||||
|
@ -134,15 +137,18 @@ public class BasicEndpointTest
|
|||
Assert.assertEquals("Expected message","Hello World",incomingMessage);
|
||||
|
||||
clientSession.close();
|
||||
clientSocket.awaitCloseEvent("Client");
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.stop();
|
||||
LOG.debug("Stopped - " + client);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
wsb.stop();
|
||||
LOG.debug("Stopped - " + wsb);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,6 +7,6 @@
|
|||
version="3.0">
|
||||
|
||||
<listener>
|
||||
<listener-class>org.eclipse.jetty.websocket.tests.server.jsr356.BasicEndpointTest$BasicEchoEndpointConfigContextListener</listener-class>
|
||||
<listener-class>org.eclipse.jetty.websocket.tests.server.jsr356.EndpointViaConfigTest$BasicEchoEndpointConfigContextListener</listener-class>
|
||||
</listener>
|
||||
</web-app>
|
Loading…
Reference in New Issue