Issue #3379 - Add tracking of WebSocket Sessions to various WebSocket Container APIs
+ Jetty WebSocket API now tracks Sessions and will close them on lifecycle stop + Javax WebSocket API now tracks Sessions and will close them on lifecycle stop + Adding Jetty WebSocket tests for proper close / session tracking + Disabling tests that need triage Signed-off-by: Joakim Erdfelt <joakim.erdfelt@gmail.com>
This commit is contained in:
parent
47b84fe548
commit
d8a9f2619b
|
@ -18,11 +18,13 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.javax.common;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import javax.websocket.Extension;
|
||||
import javax.websocket.Session;
|
||||
import javax.websocket.WebSocketContainer;
|
||||
|
@ -30,13 +32,24 @@ import javax.websocket.WebSocketContainer;
|
|||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.DecoratedObjectFactory;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
|
||||
|
||||
public abstract class JavaxWebSocketContainer extends ContainerLifeCycle implements javax.websocket.WebSocketContainer
|
||||
{
|
||||
private final static Logger LOG = Log.getLogger(JavaxWebSocketContainer.class);
|
||||
private final SessionTracker sessionTracker = new SessionTracker();
|
||||
private long defaultAsyncSendTimeout = -1;
|
||||
private int defaultMaxBinaryMessageBufferSize = 64 * 1024;
|
||||
private int defaultMaxTextMessageBufferSize = 64 * 1024;
|
||||
private List<JavaxWebSocketSessionListener> sessionListeners = new ArrayList<>();
|
||||
|
||||
public JavaxWebSocketContainer()
|
||||
{
|
||||
addSessionListener(sessionTracker);
|
||||
addBean(sessionTracker);
|
||||
}
|
||||
|
||||
public abstract ByteBufferPool getBufferPool();
|
||||
|
||||
|
@ -100,7 +113,7 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
|
|||
*/
|
||||
public Set<javax.websocket.Session> getOpenSessions()
|
||||
{
|
||||
return new HashSet<>(getBeans(JavaxWebSocketSession.class));
|
||||
return sessionTracker.getSessions();
|
||||
}
|
||||
|
||||
public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
|
||||
|
@ -118,4 +131,45 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
|
|||
protected abstract WebSocketExtensionRegistry getExtensionRegistry();
|
||||
|
||||
protected abstract JavaxWebSocketFrameHandlerFactory getFrameHandlerFactory();
|
||||
|
||||
/**
|
||||
* Register a WebSocketSessionListener with the container
|
||||
*
|
||||
* @param listener the listener
|
||||
*/
|
||||
public void addSessionListener(JavaxWebSocketSessionListener listener)
|
||||
{
|
||||
sessionListeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a WebSocketSessionListener from the container
|
||||
*
|
||||
* @param listener the listener
|
||||
* @return true if listener was present and removed
|
||||
*/
|
||||
public boolean removeSessionListener(JavaxWebSocketSessionListener listener)
|
||||
{
|
||||
return sessionListeners.remove(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify Session Listeners of events
|
||||
*
|
||||
* @param consumer the consumer to pass to each listener
|
||||
*/
|
||||
public void notifySessionListeners(Consumer<JavaxWebSocketSessionListener> consumer)
|
||||
{
|
||||
for (JavaxWebSocketSessionListener listener : sessionListeners)
|
||||
{
|
||||
try
|
||||
{
|
||||
consumer.accept(listener);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("Exception while invoking listener " + listener, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.websocket.CloseReason;
|
||||
import javax.websocket.Decoder;
|
||||
import javax.websocket.EndpointConfig;
|
||||
|
@ -229,7 +228,7 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
|
|||
if (openHandle != null)
|
||||
openHandle.invoke();
|
||||
|
||||
container.addBean(session, true);
|
||||
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionOpened(session));
|
||||
callback.succeeded();
|
||||
futureSession.complete(session);
|
||||
}
|
||||
|
@ -283,8 +282,8 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
|
|||
CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason());
|
||||
closeHandle.invoke(closeReason);
|
||||
}
|
||||
container.removeBean(session);
|
||||
callback.succeeded();
|
||||
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionClosed(session));
|
||||
}
|
||||
catch (Throwable cause)
|
||||
{
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.websocket.CloseReason;
|
||||
import javax.websocket.EndpointConfig;
|
||||
import javax.websocket.Extension;
|
||||
|
@ -39,10 +38,12 @@ import javax.websocket.RemoteEndpoint.Basic;
|
|||
import javax.websocket.Session;
|
||||
import javax.websocket.WebSocketContainer;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.SharedBlockingCallback;
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.core.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||
import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders;
|
||||
|
@ -551,6 +552,25 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
|
|||
return coreSession.isSecure();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop()
|
||||
{
|
||||
coreSession.close(CloseStatus.SHUTDOWN, "Container being shut down", new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
coreSession.abort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
coreSession.abort();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeMessageHandler(MessageHandler handler)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.javax.common;
|
||||
|
||||
public interface JavaxWebSocketSessionListener
|
||||
{
|
||||
void onJavaxWebSocketSessionOpened(JavaxWebSocketSession session);
|
||||
|
||||
void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session);
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.javax.common;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import javax.websocket.Session;
|
||||
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
|
||||
public class SessionTracker extends AbstractLifeCycle implements JavaxWebSocketSessionListener
|
||||
{
|
||||
private CopyOnWriteArraySet<JavaxWebSocketSession> sessions = new CopyOnWriteArraySet<>();
|
||||
|
||||
public Set<Session> getSessions()
|
||||
{
|
||||
return Collections.unmodifiableSet(sessions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onJavaxWebSocketSessionOpened(JavaxWebSocketSession session)
|
||||
{
|
||||
sessions.add(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session)
|
||||
{
|
||||
sessions.remove(sessions);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
for (JavaxWebSocketSession session : sessions)
|
||||
{
|
||||
LifeCycle.stop(session);
|
||||
}
|
||||
super.doStop();
|
||||
}
|
||||
}
|
|
@ -24,16 +24,19 @@ import java.net.SocketAddress;
|
|||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.DecoratedObjectFactory;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
|
@ -43,14 +46,20 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
|||
import org.eclipse.jetty.websocket.client.impl.JettyClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandler;
|
||||
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
|
||||
import org.eclipse.jetty.websocket.common.SessionTracker;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketContainer;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionListener;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
|
||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||
|
||||
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy
|
||||
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(WebSocketClient.class);
|
||||
private final WebSocketCoreClient coreClient;
|
||||
private final int id = ThreadLocalRandom.current().nextInt();
|
||||
private final JettyWebSocketFrameHandlerFactory frameHandlerFactory;
|
||||
private final List<WebSocketSessionListener> sessionListeners = new CopyOnWriteArrayList<>();
|
||||
private final SessionTracker sessionTracker = new SessionTracker();
|
||||
private ClassLoader contextClassLoader;
|
||||
private DecoratedObjectFactory objectFactory;
|
||||
private WebSocketExtensionRegistry extensionRegistry;
|
||||
|
@ -88,7 +97,9 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
this.contextClassLoader = this.getClass().getClassLoader();
|
||||
this.objectFactory = new DecoratedObjectFactory();
|
||||
this.extensionRegistry = new WebSocketExtensionRegistry();
|
||||
this.frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(getExecutor());
|
||||
this.frameHandlerFactory = new JettyWebSocketFrameHandlerFactory(this);
|
||||
this.sessionListeners.add(sessionTracker);
|
||||
addBean(sessionTracker);
|
||||
}
|
||||
|
||||
public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
|
||||
|
@ -124,6 +135,34 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
return WebSocketBehavior.CLIENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSessionListener(WebSocketSessionListener listener)
|
||||
{
|
||||
sessionListeners.add(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeSessionListener(WebSocketSessionListener listener)
|
||||
{
|
||||
return sessionListeners.remove(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifySessionListeners(Consumer<WebSocketSessionListener> consumer)
|
||||
{
|
||||
for (WebSocketSessionListener listener : sessionListeners)
|
||||
{
|
||||
try
|
||||
{
|
||||
consumer.accept(listener);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("Exception while invoking listener " + listener, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getIdleTimeout()
|
||||
{
|
||||
|
@ -224,6 +263,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
return getHttpClient().getByteBufferPool();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return getHttpClient().getExecutor();
|
||||
|
@ -246,7 +286,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
|
||||
public Collection<Session> getOpenSessions()
|
||||
{
|
||||
return Collections.unmodifiableSet(new HashSet<>(getBeans(Session.class)));
|
||||
return sessionTracker.getSessions();
|
||||
}
|
||||
|
||||
public JettyWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
|
|||
public class JettyWebSocketFrameHandler implements FrameHandler
|
||||
{
|
||||
private final Logger log;
|
||||
private final Executor executor;
|
||||
private final WebSocketContainer container;
|
||||
private final Object endpointInstance;
|
||||
private MethodHandle openHandle;
|
||||
private MethodHandle closeHandle;
|
||||
|
@ -73,7 +73,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
private MessageSink activeMessageSink;
|
||||
private WebSocketSessionImpl session;
|
||||
|
||||
public JettyWebSocketFrameHandler(Executor executor,
|
||||
public JettyWebSocketFrameHandler(WebSocketContainer container,
|
||||
Object endpointInstance,
|
||||
UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
|
||||
MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle,
|
||||
|
@ -87,7 +87,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
{
|
||||
this.log = Log.getLogger(endpointInstance.getClass());
|
||||
|
||||
this.executor = executor;
|
||||
this.container = container;
|
||||
this.endpointInstance = endpointInstance;
|
||||
this.upgradeRequest = upgradeRequest;
|
||||
this.upgradeResponse = upgradeResponse;
|
||||
|
@ -131,6 +131,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session);
|
||||
pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session);
|
||||
|
||||
Executor executor = container.getExecutor();
|
||||
|
||||
if (textHandle != null)
|
||||
textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize());
|
||||
|
||||
|
@ -141,6 +143,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
if (openHandle != null)
|
||||
openHandle.invoke();
|
||||
|
||||
container.notifySessionListeners((listener) -> listener.onWebSocketSessionOpened(session));
|
||||
|
||||
callback.succeeded();
|
||||
futureSession.complete(session);
|
||||
}
|
||||
|
@ -225,6 +229,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
public void onClosed(CloseStatus closeStatus, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
container.notifySessionListeners((listener) -> listener.onWebSocketSessionClosed(session));
|
||||
}
|
||||
|
||||
public String toString()
|
||||
|
@ -333,7 +338,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
acceptMessage(frame, callback);
|
||||
}
|
||||
|
||||
|
||||
static Throwable convertCause(Throwable cause)
|
||||
{
|
||||
if (cause instanceof MessageTooLargeException)
|
||||
|
|
|
@ -80,13 +80,12 @@ import org.eclipse.jetty.websocket.common.util.ReflectUtils;
|
|||
*/
|
||||
public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
|
||||
{
|
||||
private final Executor executor;
|
||||
private final WebSocketContainer container;
|
||||
private Map<Class<?>, JettyWebSocketFrameHandlerMetadata> metadataMap = new ConcurrentHashMap<>();
|
||||
|
||||
public JettyWebSocketFrameHandlerFactory(Executor executor)
|
||||
public JettyWebSocketFrameHandlerFactory(WebSocketContainer container)
|
||||
{
|
||||
this.executor = executor;
|
||||
addBean(executor);
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
public JettyWebSocketFrameHandlerMetadata getMetadata(Class<?> endpointClass)
|
||||
|
@ -148,7 +147,7 @@ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
|
|||
future = new CompletableFuture<>();
|
||||
|
||||
JettyWebSocketFrameHandler frameHandler = new JettyWebSocketFrameHandler(
|
||||
executor,
|
||||
container,
|
||||
endpointInstance,
|
||||
upgradeRequest, upgradeResponse,
|
||||
openHandle, closeHandle, errorHandle,
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
|
||||
public class SessionTracker extends AbstractLifeCycle implements WebSocketSessionListener
|
||||
{
|
||||
private List<Session> sessions = new CopyOnWriteArrayList<>();
|
||||
|
||||
public Collection<Session> getSessions()
|
||||
{
|
||||
return sessions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketSessionOpened(WebSocketSessionImpl session)
|
||||
{
|
||||
sessions.add(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketSessionClosed(WebSocketSessionImpl session)
|
||||
{
|
||||
sessions.remove(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
for (Session session : sessions)
|
||||
{
|
||||
LifeCycle.stop(session);
|
||||
}
|
||||
super.doStop();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
|
||||
/**
|
||||
* Generic interface to the Container (server or client) that jetty-websocket-common can use
|
||||
*/
|
||||
public interface WebSocketContainer
|
||||
{
|
||||
/**
|
||||
* The Container provided Executor.
|
||||
*/
|
||||
Executor getExecutor();
|
||||
|
||||
/**
|
||||
* Get the collection of open Sessions being tracked by this container
|
||||
*
|
||||
* @return the collection of open sessions
|
||||
*/
|
||||
Collection<Session> getOpenSessions();
|
||||
|
||||
/**
|
||||
* Register a WebSocketSessionListener with the container
|
||||
*
|
||||
* @param listener the listener
|
||||
*/
|
||||
void addSessionListener(WebSocketSessionListener listener);
|
||||
|
||||
/**
|
||||
* Remove a WebSocketSessionListener from the container
|
||||
*
|
||||
* @param listener the listener
|
||||
* @return true if listener was present and removed
|
||||
*/
|
||||
boolean removeSessionListener(WebSocketSessionListener listener);
|
||||
|
||||
/**
|
||||
* Notify the Session Listeners of an event.
|
||||
*
|
||||
* @param consumer the consumer to call for each tracked listener
|
||||
*/
|
||||
void notifySessionListeners(Consumer<WebSocketSessionListener> consumer);
|
||||
}
|
|
@ -23,17 +23,23 @@ import java.net.SocketAddress;
|
|||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeResponse;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||
|
||||
public class WebSocketSessionImpl implements Session, Dumpable
|
||||
public class WebSocketSessionImpl extends AbstractLifeCycle implements Session, Dumpable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(WebSocketSessionImpl.class);
|
||||
private final FrameHandler.CoreSession coreSession;
|
||||
private final JettyWebSocketFrameHandler frameHandler;
|
||||
private final JettyWebSocketRemoteEndpoint remoteEndpoint;
|
||||
|
@ -206,6 +212,30 @@ public class WebSocketSessionImpl implements Session, Dumpable
|
|||
return null;
|
||||
}
|
||||
|
||||
public FrameHandler.CoreSession getCoreSession()
|
||||
{
|
||||
return coreSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
coreSession.close(StatusCode.SHUTDOWN, "Container being shut down", new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
coreSession.abort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
coreSession.abort();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
/**
|
||||
* Interface for Listeners that are interested in knowing about the WebSocketSession history.
|
||||
*/
|
||||
public interface WebSocketSessionListener
|
||||
{
|
||||
void onWebSocketSessionOpened(WebSocketSessionImpl session);
|
||||
|
||||
void onWebSocketSessionClosed(WebSocketSessionImpl session);
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
|
||||
public class DummyContainer extends ContainerLifeCycle implements WebSocketContainer
|
||||
{
|
||||
private final QueuedThreadPool executor;
|
||||
private final List<WebSocketSessionListener> sessionListeners = new ArrayList<>();
|
||||
private final SessionTracker sessionTracker = new SessionTracker();
|
||||
|
||||
public DummyContainer()
|
||||
{
|
||||
executor = new QueuedThreadPool();
|
||||
executor.setName("dummy-container");
|
||||
addBean(executor);
|
||||
|
||||
addSessionListener(sessionTracker);
|
||||
addBean(sessionTracker);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Session> getOpenSessions()
|
||||
{
|
||||
return sessionTracker.getSessions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSessionListener(WebSocketSessionListener listener)
|
||||
{
|
||||
sessionListeners.add(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeSessionListener(WebSocketSessionListener listener)
|
||||
{
|
||||
return sessionListeners.remove(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifySessionListeners(Consumer<WebSocketSessionListener> consumer)
|
||||
{
|
||||
for (WebSocketSessionListener listener : sessionListeners)
|
||||
{
|
||||
try
|
||||
{
|
||||
consumer.accept(listener);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
x.printStackTrace(System.err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
|
@ -56,21 +55,22 @@ import static org.junit.jupiter.api.Assertions.assertTimeout;
|
|||
|
||||
public class JettyWebSocketFrameHandlerTest
|
||||
{
|
||||
private static QueuedThreadPool executor = new QueuedThreadPool();
|
||||
private static DummyContainer container;
|
||||
|
||||
@BeforeAll
|
||||
public static void startContainer() throws Exception
|
||||
{
|
||||
executor.start();
|
||||
container = new DummyContainer();
|
||||
container.start();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void stopContainer() throws Exception
|
||||
{
|
||||
executor.stop();
|
||||
container.stop();
|
||||
}
|
||||
|
||||
private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(executor);
|
||||
private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(container);
|
||||
private FrameHandler.CoreSession channel = new FrameHandler.CoreSession.Empty()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
|
||||
import org.eclipse.jetty.websocket.common.endpoints.annotated.AnnotatedBinaryArraySocket;
|
||||
import org.eclipse.jetty.websocket.common.endpoints.annotated.AnnotatedBinaryStreamSocket;
|
||||
|
@ -55,22 +54,22 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
public class LocalEndpointMetadataTest
|
||||
{
|
||||
public static final Matcher<Object> EXISTS = notNullValue();
|
||||
public static QueuedThreadPool threadpool;
|
||||
public static DummyContainer container;
|
||||
|
||||
@BeforeAll
|
||||
public static void startContainer() throws Exception
|
||||
{
|
||||
threadpool = new QueuedThreadPool();
|
||||
threadpool.start();
|
||||
container = new DummyContainer();
|
||||
container.start();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void stopContainer() throws Exception
|
||||
{
|
||||
threadpool.stop();
|
||||
container.stop();
|
||||
}
|
||||
|
||||
private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(threadpool);
|
||||
private JettyWebSocketFrameHandlerFactory endpointFactory = new JettyWebSocketFrameHandlerFactory(container);
|
||||
|
||||
private JettyWebSocketFrameHandlerMetadata createMetadata(Class<?> endpointClass)
|
||||
{
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
package org.eclipse.jetty.websocket.server;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletException;
|
||||
|
||||
|
@ -28,8 +26,10 @@ import org.eclipse.jetty.server.handler.ContextHandler;
|
|||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketContainer;
|
||||
import org.eclipse.jetty.websocket.core.FrameHandler;
|
||||
import org.eclipse.jetty.websocket.server.internal.DelegatedJettyServletUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.server.internal.JettyWebSocketServerContainer;
|
||||
import org.eclipse.jetty.websocket.server.internal.UpgradeResponseAdapter;
|
||||
import org.eclipse.jetty.websocket.servlet.FrameHandlerFactory;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
|
||||
|
@ -47,21 +47,18 @@ public class JettyServerFrameHandlerFactory
|
|||
JettyServerFrameHandlerFactory factory = contextHandler.getBean(JettyServerFrameHandlerFactory.class);
|
||||
if (factory == null)
|
||||
{
|
||||
Executor executor = (Executor)servletContext
|
||||
.getAttribute("org.eclipse.jetty.server.Executor");
|
||||
if (executor == null)
|
||||
executor = contextHandler.getServer().getThreadPool();
|
||||
|
||||
factory = new JettyServerFrameHandlerFactory(executor);
|
||||
JettyWebSocketServerContainer container = new JettyWebSocketServerContainer(contextHandler);
|
||||
servletContext.setAttribute(WebSocketContainer.class.getName(), container);
|
||||
factory = new JettyServerFrameHandlerFactory(container);
|
||||
contextHandler.addManaged(factory);
|
||||
contextHandler.addLifeCycleListener(factory);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
public JettyServerFrameHandlerFactory(Executor executor)
|
||||
public JettyServerFrameHandlerFactory(WebSocketContainer container)
|
||||
{
|
||||
super(executor);
|
||||
super(container);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.server.internal;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.server.handler.ContextHandler;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.common.SessionTracker;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketContainer;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionListener;
|
||||
|
||||
public class JettyWebSocketServerContainer implements WebSocketContainer
|
||||
{
|
||||
private final static Logger LOG = Log.getLogger(JettyWebSocketServerContainer.class);
|
||||
private final Executor executor;
|
||||
private final List<WebSocketSessionListener> sessionListeners = new ArrayList<>();
|
||||
private final SessionTracker sessionTracker = new SessionTracker();
|
||||
|
||||
public JettyWebSocketServerContainer(ContextHandler handler)
|
||||
{
|
||||
Executor executor = (Executor) handler
|
||||
.getAttribute("org.eclipse.jetty.server.Executor");
|
||||
if (executor == null)
|
||||
{
|
||||
executor = handler.getServer().getThreadPool();
|
||||
}
|
||||
if (executor == null)
|
||||
{
|
||||
executor = new QueuedThreadPool(); // default settings
|
||||
}
|
||||
this.executor = executor;
|
||||
addSessionListener(sessionTracker);
|
||||
handler.addBean(sessionTracker);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSessionListener(WebSocketSessionListener listener)
|
||||
{
|
||||
sessionListeners.add(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeSessionListener(WebSocketSessionListener listener)
|
||||
{
|
||||
return sessionListeners.remove(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifySessionListeners(Consumer<WebSocketSessionListener> consumer)
|
||||
{
|
||||
for (WebSocketSessionListener listener : sessionListeners)
|
||||
{
|
||||
try
|
||||
{
|
||||
consumer.accept(listener);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("Exception while invoking listener " + listener, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Session> getOpenSessions()
|
||||
{
|
||||
return sessionTracker.getSessions();
|
||||
}
|
||||
}
|
|
@ -8,15 +8,15 @@
|
|||
<version>10.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>jetty-websocket-tests</artifactId>
|
||||
<name>Jetty :: Websocket :: org.eclipse.jetty.websocket :: Tests</name>
|
||||
|
||||
<properties>
|
||||
<properties>
|
||||
<bundle-symbolic-name>${project.groupId}.jetty.websocket.tests</bundle-symbolic-name>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.websocket</groupId>
|
||||
<artifactId>jetty-websocket-api</artifactId>
|
||||
|
@ -44,7 +44,7 @@
|
|||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.felix</groupId>
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionImpl;
|
||||
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
|
||||
import org.eclipse.jetty.websocket.core.internal.WebSocketConnection;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class CloseTrackingEndpoint extends WebSocketAdapter
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(CloseTrackingEndpoint.class);
|
||||
|
||||
public int closeCode = -1;
|
||||
public String closeReason = null;
|
||||
public CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
public AtomicInteger closeCount = new AtomicInteger(0);
|
||||
public CountDownLatch openLatch = new CountDownLatch(1);
|
||||
public CountDownLatch errorLatch = new CountDownLatch(1);
|
||||
|
||||
public LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
|
||||
public AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
|
||||
public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher<Integer> statusCodeMatcher)
|
||||
throws InterruptedException
|
||||
{
|
||||
assertReceivedCloseEvent(clientTimeoutMs, statusCodeMatcher, null);
|
||||
}
|
||||
|
||||
public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher<Integer> statusCodeMatcher, Matcher<String> reasonMatcher)
|
||||
throws InterruptedException
|
||||
{
|
||||
assertThat("Client Close Event Occurred", closeLatch.await(clientTimeoutMs, TimeUnit.MILLISECONDS), is(true));
|
||||
assertThat("Client Close Event Count", closeCount.get(), is(1));
|
||||
assertThat("Client Close Event Status Code", closeCode, statusCodeMatcher);
|
||||
if (reasonMatcher == null)
|
||||
{
|
||||
assertThat("Client Close Event Reason", closeReason, nullValue());
|
||||
}
|
||||
else
|
||||
{
|
||||
assertThat("Client Close Event Reason", closeReason, reasonMatcher);
|
||||
}
|
||||
}
|
||||
|
||||
public void clearQueues()
|
||||
{
|
||||
messageQueue.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
LOG.debug("onWebSocketClose({},{})", statusCode, reason);
|
||||
super.onWebSocketClose(statusCode, reason);
|
||||
closeCount.incrementAndGet();
|
||||
closeCode = statusCode;
|
||||
closeReason = reason;
|
||||
closeLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session)
|
||||
{
|
||||
LOG.debug("onWebSocketConnect({})", session);
|
||||
super.onWebSocketConnect(session);
|
||||
openLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause)
|
||||
{
|
||||
LOG.debug("onWebSocketError", cause);
|
||||
assertThat("Unique Error Event", error.compareAndSet(null, cause), is(true));
|
||||
errorLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message)
|
||||
{
|
||||
LOG.debug("onWebSocketText({})", message);
|
||||
messageQueue.offer(message);
|
||||
}
|
||||
|
||||
public EndPoint getEndPoint()
|
||||
{
|
||||
Session session = getSession();
|
||||
assertThat("Session type", session, instanceOf(WebSocketSessionImpl.class));
|
||||
|
||||
WebSocketSessionImpl wsSession = (WebSocketSessionImpl) session;
|
||||
WebSocketChannel wsChannel = (WebSocketChannel) wsSession.getCoreSession();
|
||||
WebSocketConnection wsConnection = wsChannel.getConnection();
|
||||
|
||||
return wsConnection.getEndPoint();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests;
|
||||
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
|
||||
|
||||
public class EchoCreator implements WebSocketCreator
|
||||
{
|
||||
@Override
|
||||
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
|
||||
{
|
||||
if (req.hasSubProtocol("echo"))
|
||||
{
|
||||
resp.setAcceptedSubProtocol("echo");
|
||||
}
|
||||
|
||||
return new EchoSocket();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
|
||||
@WebSocket
|
||||
public class EchoSocket
|
||||
{
|
||||
@OnWebSocketMessage
|
||||
public void onMessage(Session session, String msg) throws IOException
|
||||
{
|
||||
session.getRemote().sendString(msg);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,205 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
import org.eclipse.jetty.websocket.api.util.WSURI;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* Tests for conditions due to bad networking.
|
||||
*/
|
||||
@Disabled("Needs triage")
|
||||
public class BadNetworkTest
|
||||
{
|
||||
private Server server;
|
||||
private WebSocketClient client;
|
||||
|
||||
@BeforeEach
|
||||
public void startClient() throws Exception
|
||||
{
|
||||
client = new WebSocketClient();
|
||||
client.setIdleTimeout(Duration.ofMillis(500));
|
||||
client.start();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
JettyWebSocketServletContainerInitializer.configure(context);
|
||||
context.setContextPath("/");
|
||||
ServletHolder holder = new ServletHolder(new WebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.setIdleTimeout(Duration.ofSeconds(10));
|
||||
factory.setMaxTextMessageSize(1024 * 1024 * 2);
|
||||
factory.register(ServerEndpoint.class);
|
||||
}
|
||||
});
|
||||
context.addServlet(holder, "/ws");
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
handlers.addHandler(context);
|
||||
handlers.addHandler(new DefaultHandler());
|
||||
server.setHandler(handlers);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopClient() throws Exception
|
||||
{
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbruptClientClose() throws Exception
|
||||
{
|
||||
CloseTrackingEndpoint wsocket = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> future = client.connect(wsocket, wsUri);
|
||||
|
||||
// Validate that we are connected
|
||||
future.get(30, TimeUnit.SECONDS);
|
||||
|
||||
// Have client disconnect abruptly
|
||||
Session session = wsocket.getSession();
|
||||
session.disconnect();
|
||||
|
||||
// Client Socket should see a close event, with status NO_CLOSE
|
||||
// This event is automatically supplied by the underlying WebSocketClientConnection
|
||||
// in the situation of a bad network connection.
|
||||
wsocket.assertReceivedCloseEvent(5000, is(StatusCode.NO_CLOSE), containsString(""));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbruptServerClose() throws Exception
|
||||
{
|
||||
CloseTrackingEndpoint wsocket = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> future = client.connect(wsocket, wsUri);
|
||||
|
||||
// Validate that we are connected
|
||||
Session session = future.get(30, TimeUnit.SECONDS);
|
||||
|
||||
// Have server disconnect abruptly
|
||||
session.getRemote().sendString("abort");
|
||||
|
||||
// Client Socket should see a close event, with status NO_CLOSE
|
||||
// This event is automatically supplied by the underlying WebSocketClientConnection
|
||||
// in the situation of a bad network connection.
|
||||
wsocket.assertReceivedCloseEvent(5000, is(StatusCode.NO_CLOSE), containsString(""));
|
||||
}
|
||||
|
||||
public static class ServerEndpoint implements WebSocketListener
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ClientCloseTest.ServerEndpoint.class);
|
||||
private Session session;
|
||||
|
||||
@Override
|
||||
public void onWebSocketBinary(byte[] payload, int offset, int len)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (message.equals("abort"))
|
||||
{
|
||||
session.disconnect();
|
||||
}
|
||||
else
|
||||
{
|
||||
// simple echo
|
||||
session.getRemote().sendString(message);
|
||||
}
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session)
|
||||
{
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug(cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,483 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.client;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.CloseException;
|
||||
import org.eclipse.jetty.websocket.api.MessageTooLargeException;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketFrameListener;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
import org.eclipse.jetty.websocket.api.util.WSURI;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.core.OpCode;
|
||||
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static java.time.Duration.ofSeconds;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
|
||||
|
||||
@Disabled("Needs triage")
|
||||
public class ClientCloseTest
|
||||
{
|
||||
private Server server;
|
||||
private WebSocketClient client;
|
||||
|
||||
private Session confirmConnection(CloseTrackingEndpoint clientSocket, Future<Session> clientFuture) throws Exception
|
||||
{
|
||||
// Wait for client connect on via future
|
||||
Session session = clientFuture.get(30, SECONDS);
|
||||
|
||||
try
|
||||
{
|
||||
// Send message from client to server
|
||||
final String echoMsg = "echo-test";
|
||||
Future<Void> testFut = clientSocket.getRemote().sendStringByFuture(echoMsg);
|
||||
|
||||
// Wait for send future
|
||||
testFut.get(5, SECONDS);
|
||||
|
||||
// Verify received message
|
||||
String recvMsg = clientSocket.messageQueue.poll(5, SECONDS);
|
||||
assertThat("Received message", recvMsg, is(echoMsg));
|
||||
|
||||
// Verify that there are no errors
|
||||
assertThat("Error events", clientSocket.error.get(), nullValue());
|
||||
}
|
||||
finally
|
||||
{
|
||||
clientSocket.clearQueues();
|
||||
}
|
||||
|
||||
return session;
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void startClient() throws Exception
|
||||
{
|
||||
client = new WebSocketClient();
|
||||
client.setMaxTextMessageSize(1024);
|
||||
client.start();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
JettyWebSocketServletContainerInitializer.configure(context);
|
||||
context.setContextPath("/");
|
||||
ServletHolder holder = new ServletHolder(new WebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.setIdleTimeout(Duration.ofSeconds(10));
|
||||
factory.setMaxTextMessageSize(1024 * 1024 * 2);
|
||||
factory.register(ServerEndpoint.class);
|
||||
}
|
||||
});
|
||||
context.addServlet(holder, "/ws");
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
handlers.addHandler(context);
|
||||
handlers.addHandler(new DefaultHandler());
|
||||
server.setHandler(handlers);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopClient() throws Exception
|
||||
{
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHalfClose() throws Exception
|
||||
{
|
||||
// Set client timeout
|
||||
final int timeout = 5000;
|
||||
client.setIdleTimeout(Duration.ofMillis(timeout));
|
||||
|
||||
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
|
||||
clientSessionTracker.addTo(client);
|
||||
|
||||
// Client connects
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
|
||||
|
||||
try (Session session = confirmConnection(clientSocket, clientConnectFuture))
|
||||
{
|
||||
// client confirms connection via echo
|
||||
|
||||
// client sends close frame (code 1000, normal)
|
||||
final String origCloseReason = "send-more-frames";
|
||||
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
|
||||
|
||||
// Verify received messages
|
||||
String recvMsg = clientSocket.messageQueue.poll(5, SECONDS);
|
||||
assertThat("Received message 1", recvMsg, is("Hello"));
|
||||
recvMsg = clientSocket.messageQueue.poll(5, SECONDS);
|
||||
assertThat("Received message 2", recvMsg, is("World"));
|
||||
|
||||
// Verify that there are no errors
|
||||
assertThat("Error events", clientSocket.error.get(), nullValue());
|
||||
|
||||
// client close event on ws-endpoint
|
||||
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString(""));
|
||||
}
|
||||
|
||||
clientSessionTracker.assertClosedProperly(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageTooLargeException() throws Exception
|
||||
{
|
||||
// Set client timeout
|
||||
final int timeout = 3000;
|
||||
client.setIdleTimeout(Duration.ofMillis(timeout));
|
||||
|
||||
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
|
||||
clientSessionTracker.addTo(client);
|
||||
|
||||
// Client connects
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
|
||||
|
||||
try (Session session = confirmConnection(clientSocket, clientConnectFuture))
|
||||
{
|
||||
// client confirms connection via echo
|
||||
|
||||
session.getRemote().sendString("too-large-message");
|
||||
|
||||
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.MESSAGE_TOO_LARGE), containsString("exceeds maximum size"));
|
||||
|
||||
// client should have noticed the error
|
||||
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
|
||||
assertThat("OnError", clientSocket.error.get(), instanceOf(MessageTooLargeException.class));
|
||||
}
|
||||
|
||||
// client triggers close event on client ws-endpoint
|
||||
clientSessionTracker.assertClosedProperly(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoteDisconnect() throws Exception
|
||||
{
|
||||
// Set client timeout
|
||||
final int clientTimeout = 1000;
|
||||
client.setIdleTimeout(Duration.ofMillis(clientTimeout));
|
||||
|
||||
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
|
||||
clientSessionTracker.addTo(client);
|
||||
|
||||
// Client connects
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
|
||||
|
||||
try (Session ignored = confirmConnection(clientSocket, clientConnectFuture))
|
||||
{
|
||||
// client confirms connection via echo
|
||||
|
||||
// client sends close frame (triggering server connection abort)
|
||||
final String origCloseReason = "abort";
|
||||
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
|
||||
|
||||
// client reads -1 (EOF)
|
||||
// client triggers close event on client ws-endpoint
|
||||
clientSocket.assertReceivedCloseEvent(clientTimeout * 2,
|
||||
is(StatusCode.SHUTDOWN),
|
||||
containsString("timeout"));
|
||||
}
|
||||
|
||||
clientSessionTracker.assertClosedProperly(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerNoCloseHandshake() throws Exception
|
||||
{
|
||||
// Set client timeout
|
||||
final int clientTimeout = 1000;
|
||||
client.setIdleTimeout(Duration.ofMillis(clientTimeout));
|
||||
|
||||
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
|
||||
clientSessionTracker.addTo(client);
|
||||
|
||||
// Client connects
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
|
||||
|
||||
try (Session ignored = confirmConnection(clientSocket, clientConnectFuture))
|
||||
{
|
||||
// client confirms connection via echo
|
||||
|
||||
// client sends close frame
|
||||
final String origCloseReason = "sleep|5000";
|
||||
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
|
||||
|
||||
// client close should occur
|
||||
clientSocket.assertReceivedCloseEvent(clientTimeout * 2,
|
||||
is(StatusCode.SHUTDOWN),
|
||||
containsString("timeout"));
|
||||
|
||||
// client idle timeout triggers close event on client ws-endpoint
|
||||
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
|
||||
assertThat("OnError", clientSocket.error.get(), instanceOf(CloseException.class));
|
||||
assertThat("OnError.cause", clientSocket.error.get().getCause(), instanceOf(TimeoutException.class));
|
||||
}
|
||||
|
||||
clientSessionTracker.assertClosedProperly(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopLifecycle() throws Exception
|
||||
{
|
||||
// Set client timeout
|
||||
final int timeout = 1000;
|
||||
client.setIdleTimeout(Duration.ofMillis(timeout));
|
||||
|
||||
int sessionCount = 3;
|
||||
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(sessionCount);
|
||||
clientSessionTracker.addTo(client);
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
List<CloseTrackingEndpoint> clientSockets = new ArrayList<>();
|
||||
|
||||
// Open Multiple Clients
|
||||
for (int i = 0; i < sessionCount; i++)
|
||||
{
|
||||
// Client Request Upgrade
|
||||
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
|
||||
clientSockets.add(clientSocket);
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
|
||||
|
||||
// client confirms connection via echo
|
||||
confirmConnection(clientSocket, clientConnectFuture);
|
||||
}
|
||||
|
||||
assertTimeoutPreemptively(ofSeconds(5), () -> {
|
||||
// client lifecycle stop (the meat of this test)
|
||||
client.stop();
|
||||
});
|
||||
|
||||
// clients disconnect
|
||||
for (int i = 0; i < sessionCount; i++)
|
||||
{
|
||||
clientSockets.get(i).assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Disconnected"));
|
||||
}
|
||||
|
||||
// ensure all Sessions are gone. connections are gone. etc. (client and server)
|
||||
// ensure ConnectionListener onClose is called 3 times
|
||||
clientSessionTracker.assertClosedProperly(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteException() throws Exception
|
||||
{
|
||||
// Set client timeout
|
||||
final int timeout = 2000;
|
||||
client.setIdleTimeout(Duration.ofMillis(timeout));
|
||||
|
||||
ClientOpenSessionTracker clientSessionTracker = new ClientOpenSessionTracker(1);
|
||||
clientSessionTracker.addTo(client);
|
||||
|
||||
// Client connects
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
CloseTrackingEndpoint clientSocket = new CloseTrackingEndpoint();
|
||||
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri);
|
||||
|
||||
// client confirms connection via echo
|
||||
confirmConnection(clientSocket, clientConnectFuture);
|
||||
|
||||
// setup client endpoint for write failure (test only)
|
||||
EndPoint endp = clientSocket.getEndPoint();
|
||||
endp.shutdownOutput();
|
||||
|
||||
// client enqueue close frame
|
||||
// should result in a client write failure
|
||||
final String origCloseReason = "Normal Close from Client";
|
||||
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
|
||||
|
||||
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
|
||||
assertThat("OnError", clientSocket.error.get(), instanceOf(ClosedChannelException.class));
|
||||
|
||||
// client triggers close event on client ws-endpoint
|
||||
// assert - close code==1006 (abnormal)
|
||||
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Eof"));
|
||||
|
||||
clientSessionTracker.assertClosedProperly(client);
|
||||
}
|
||||
|
||||
public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ServerEndpoint.class);
|
||||
private Session session;
|
||||
|
||||
@Override
|
||||
public void onWebSocketBinary(byte[] payload, int offset, int len)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (message.equals("too-large-message"))
|
||||
{
|
||||
// send extra large message
|
||||
byte[] buf = new byte[1024 * 1024];
|
||||
Arrays.fill(buf, (byte) 'x');
|
||||
String bigmsg = new String(buf, UTF_8);
|
||||
session.getRemote().sendString(bigmsg);
|
||||
}
|
||||
else
|
||||
{
|
||||
// simple echo
|
||||
session.getRemote().sendString(message);
|
||||
}
|
||||
}
|
||||
catch (IOException ignore)
|
||||
{
|
||||
LOG.debug(ignore);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session)
|
||||
{
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug(cause);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketFrame(org.eclipse.jetty.websocket.api.extensions.Frame frame)
|
||||
{
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
CloseStatus closeInfo = new CloseStatus(frame.getPayload());
|
||||
String reason = closeInfo.getReason();
|
||||
|
||||
if (reason.equals("send-more-frames"))
|
||||
{
|
||||
try
|
||||
{
|
||||
session.getRemote().sendString("Hello");
|
||||
session.getRemote().sendString("World");
|
||||
}
|
||||
catch (Throwable ignore)
|
||||
{
|
||||
LOG.debug("OOPS", ignore);
|
||||
}
|
||||
}
|
||||
else if (reason.equals("abort"))
|
||||
{
|
||||
try
|
||||
{
|
||||
SECONDS.sleep(1);
|
||||
LOG.info("Server aborting session abruptly");
|
||||
session.disconnect();
|
||||
}
|
||||
catch (Throwable ignore)
|
||||
{
|
||||
LOG.ignore(ignore);
|
||||
}
|
||||
}
|
||||
else if (reason.startsWith("sleep|"))
|
||||
{
|
||||
int idx = reason.indexOf('|');
|
||||
int timeMs = Integer.parseInt(reason.substring(idx + 1));
|
||||
try
|
||||
{
|
||||
LOG.info("Server Sleeping for {} ms", timeMs);
|
||||
TimeUnit.MILLISECONDS.sleep(timeMs);
|
||||
}
|
||||
catch (InterruptedException ignore)
|
||||
{
|
||||
LOG.ignore(ignore);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.client;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionImpl;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionListener;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ClientOpenSessionTracker implements Connection.Listener, WebSocketSessionListener
|
||||
{
|
||||
private final CountDownLatch closeSessionLatch;
|
||||
private final CountDownLatch closeConnectionLatch;
|
||||
|
||||
public ClientOpenSessionTracker(int expectedSessions)
|
||||
{
|
||||
this.closeSessionLatch = new CountDownLatch(expectedSessions);
|
||||
this.closeConnectionLatch = new CountDownLatch(expectedSessions);
|
||||
}
|
||||
|
||||
public void addTo(WebSocketClient client)
|
||||
{
|
||||
client.addSessionListener(this);
|
||||
client.addBean(this);
|
||||
}
|
||||
|
||||
public void assertClosedProperly(WebSocketClient client) throws InterruptedException
|
||||
{
|
||||
assertTrue(closeConnectionLatch.await(5, SECONDS), "All Jetty Connections should have been closed");
|
||||
assertTrue(closeSessionLatch.await(5, SECONDS), "All WebSocket Sessions should have been closed");
|
||||
assertTrue(client.getOpenSessions().isEmpty(), "Client OpenSessions MUST be empty");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpened(Connection connection)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(Connection connection)
|
||||
{
|
||||
this.closeConnectionLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketSessionOpened(WebSocketSessionImpl session)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketSessionClosed(WebSocketSessionImpl session)
|
||||
{
|
||||
this.closeSessionLatch.countDown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.client;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.util.WSURI;
|
||||
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionImpl;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionListener;
|
||||
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint;
|
||||
import org.eclipse.jetty.websocket.tests.EchoCreator;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Disabled("Needs triage")
|
||||
public class ClientSessionsTest
|
||||
{
|
||||
private Server server;
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
JettyWebSocketServletContainerInitializer.configure(context);
|
||||
context.setContextPath("/");
|
||||
ServletHolder holder = new ServletHolder(new WebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.setIdleTimeout(Duration.ofSeconds(10));
|
||||
factory.setMaxTextMessageSize(1024 * 1024 * 2);
|
||||
factory.setCreator(new EchoCreator());
|
||||
}
|
||||
});
|
||||
context.addServlet(holder, "/ws");
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
handlers.addHandler(context);
|
||||
handlers.addHandler(new DefaultHandler());
|
||||
server.setHandler(handlers);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicEcho_FromClient() throws Exception
|
||||
{
|
||||
WebSocketClient client = new WebSocketClient();
|
||||
|
||||
CountDownLatch onSessionCloseLatch = new CountDownLatch(1);
|
||||
|
||||
client.addSessionListener(new WebSocketSessionListener() {
|
||||
@Override
|
||||
public void onWebSocketSessionOpened(WebSocketSessionImpl session)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketSessionClosed(WebSocketSessionImpl session)
|
||||
{
|
||||
onSessionCloseLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
client.start();
|
||||
try
|
||||
{
|
||||
CloseTrackingEndpoint cliSock = new CloseTrackingEndpoint();
|
||||
client.setIdleTimeout(Duration.ofSeconds(10));
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setSubProtocols("echo");
|
||||
Future<Session> future = client.connect(cliSock,wsUri,request);
|
||||
|
||||
try (Session sess = future.get(30000, TimeUnit.MILLISECONDS))
|
||||
{
|
||||
assertThat("Session", sess, notNullValue());
|
||||
assertThat("Session.open", sess.isOpen(), is(true));
|
||||
assertThat("Session.upgradeRequest", sess.getUpgradeRequest(), notNullValue());
|
||||
assertThat("Session.upgradeResponse", sess.getUpgradeResponse(), notNullValue());
|
||||
|
||||
Collection<Session> sessions = client.getOpenSessions();
|
||||
assertThat("client.connectionManager.sessions.size", sessions.size(), is(1));
|
||||
|
||||
RemoteEndpoint remote = sess.getRemote();
|
||||
remote.sendString("Hello World!");
|
||||
|
||||
Collection<Session> open = client.getOpenSessions();
|
||||
assertThat("(Before Close) Open Sessions.size", open.size(), is(1));
|
||||
|
||||
String received = cliSock.messageQueue.poll(5, TimeUnit.SECONDS);
|
||||
assertThat("Message", received, containsString("Hello World!"));
|
||||
}
|
||||
|
||||
cliSock.assertReceivedCloseEvent(30000, is(StatusCode.NORMAL));
|
||||
|
||||
assertTrue(onSessionCloseLatch.await(5, TimeUnit.SECONDS), "Saw onSessionClose events");
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
|
||||
Collection<Session> open = client.getOpenSessions();
|
||||
assertThat("(After Close) Open Sessions.size", open.size(), is(0));
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.client;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
|
||||
public class ClientWriteThread extends Thread
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(ClientWriteThread.class);
|
||||
private final Session session;
|
||||
private int slowness = -1;
|
||||
private int messageCount = 100;
|
||||
private String message = "Hello";
|
||||
|
||||
public ClientWriteThread(Session session)
|
||||
{
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public String getMessage()
|
||||
{
|
||||
return message;
|
||||
}
|
||||
|
||||
public int getMessageCount()
|
||||
{
|
||||
return messageCount;
|
||||
}
|
||||
|
||||
public int getSlowness()
|
||||
{
|
||||
return slowness;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
final AtomicInteger m = new AtomicInteger();
|
||||
|
||||
try
|
||||
{
|
||||
LOG.debug("Writing {} messages to connection {}",messageCount);
|
||||
LOG.debug("Artificial Slowness {} ms",slowness);
|
||||
Future<Void> lastMessage = null;
|
||||
RemoteEndpoint remote = session.getRemote();
|
||||
while (m.get() < messageCount)
|
||||
{
|
||||
lastMessage = remote.sendStringByFuture(message + "/" + m.get() + "/");
|
||||
|
||||
m.incrementAndGet();
|
||||
|
||||
if (slowness > 0)
|
||||
{
|
||||
TimeUnit.MILLISECONDS.sleep(slowness);
|
||||
}
|
||||
}
|
||||
if (remote.getBatchMode() == BatchMode.ON)
|
||||
remote.flush();
|
||||
// block on write of last message
|
||||
if (lastMessage != null)
|
||||
lastMessage.get(2,TimeUnit.MINUTES); // block on write
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMessage(String message)
|
||||
{
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public void setMessageCount(int messageCount)
|
||||
{
|
||||
this.messageCount = messageCount;
|
||||
}
|
||||
|
||||
public void setSlowness(int slowness)
|
||||
{
|
||||
this.slowness = slowness;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.client;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.util.WSURI;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint;
|
||||
import org.eclipse.jetty.websocket.tests.EchoSocket;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* This Regression Test Exists because of Client side Idle timeout, Read, and Parser bugs.
|
||||
*/
|
||||
public class SlowClientTest
|
||||
{
|
||||
private Server server;
|
||||
private WebSocketClient client;
|
||||
|
||||
@BeforeEach
|
||||
public void startClient() throws Exception
|
||||
{
|
||||
client = new WebSocketClient();
|
||||
client.setIdleTimeout(Duration.ofSeconds(60));
|
||||
client.start();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
context.setContextPath("/");
|
||||
ServletHolder websocket = new ServletHolder(new WebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.register(EchoSocket.class);
|
||||
}
|
||||
});
|
||||
context.addServlet(websocket, "/ws");
|
||||
JettyWebSocketServletContainerInitializer.configure(context);
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
handlers.addHandler(context);
|
||||
handlers.addHandler(new DefaultHandler());
|
||||
|
||||
server.setHandler(handlers);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopClient() throws Exception
|
||||
{
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientSlowToSend() throws Exception
|
||||
{
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
client.setIdleTimeout(Duration.ofSeconds(60));
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> future = client.connect(clientEndpoint, wsUri);
|
||||
|
||||
// Confirm connected
|
||||
Session session = future.get(5, SECONDS);
|
||||
|
||||
int messageCount = 10;
|
||||
try
|
||||
{
|
||||
// Have client write slowly.
|
||||
ClientWriteThread writer = new ClientWriteThread(clientEndpoint.getSession());
|
||||
writer.setMessageCount(messageCount);
|
||||
writer.setMessage("Hello");
|
||||
writer.setSlowness(10);
|
||||
writer.start();
|
||||
writer.join();
|
||||
|
||||
// Close
|
||||
clientEndpoint.getSession().close(StatusCode.NORMAL, "Done");
|
||||
|
||||
// confirm close received on server
|
||||
clientEndpoint.assertReceivedCloseEvent(10000, is(StatusCode.NORMAL), containsString("Done"));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (session != null)
|
||||
{
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public abstract class AbstractCloseEndpoint extends WebSocketAdapter
|
||||
{
|
||||
public final Logger LOG;
|
||||
public CountDownLatch closeLatch = new CountDownLatch(1);
|
||||
public String closeReason = null;
|
||||
public int closeStatusCode = -1;
|
||||
public LinkedBlockingQueue<Throwable> errors = new LinkedBlockingQueue<>();
|
||||
|
||||
public AbstractCloseEndpoint()
|
||||
{
|
||||
this.LOG = Log.getLogger(this.getClass().getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason)
|
||||
{
|
||||
LOG.debug("onWebSocketClose({}, {})",statusCode,reason);
|
||||
this.closeStatusCode = statusCode;
|
||||
this.closeReason = reason;
|
||||
closeLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause)
|
||||
{
|
||||
errors.offer(cause);
|
||||
}
|
||||
|
||||
public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher<Integer> statusCodeMatcher, Matcher<String> reasonMatcher)
|
||||
throws InterruptedException
|
||||
{
|
||||
assertThat("Client Close Event Occurred", closeLatch.await(clientTimeoutMs, TimeUnit.MILLISECONDS), is(true));
|
||||
assertThat("Client Close Event Status Code", closeStatusCode, statusCodeMatcher);
|
||||
if (reasonMatcher == null)
|
||||
{
|
||||
assertThat("Client Close Event Reason", closeReason, nullValue());
|
||||
}
|
||||
else
|
||||
{
|
||||
assertThat("Client Close Event Reason", closeReason, reasonMatcher);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketContainer;
|
||||
|
||||
/**
|
||||
* On Message, return container information
|
||||
*/
|
||||
public class ContainerEndpoint extends AbstractCloseEndpoint
|
||||
{
|
||||
private final WebSocketContainer container;
|
||||
private Session session;
|
||||
|
||||
public ContainerEndpoint(WebSocketContainer container)
|
||||
{
|
||||
super();
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message)
|
||||
{
|
||||
LOG.debug("onWebSocketText({})",message);
|
||||
if (message.equalsIgnoreCase("openSessions"))
|
||||
{
|
||||
Collection<Session> sessions = container.getOpenSessions();
|
||||
|
||||
StringBuilder ret = new StringBuilder();
|
||||
ret.append("openSessions.size=").append(sessions.size()).append('\n');
|
||||
int idx = 0;
|
||||
for (Session sess : sessions)
|
||||
{
|
||||
ret.append('[').append(idx++).append("] ").append(sess.toString()).append('\n');
|
||||
}
|
||||
session.getRemote().sendStringByFuture(ret.toString());
|
||||
}
|
||||
session.close(StatusCode.NORMAL,"ContainerEndpoint");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session sess)
|
||||
{
|
||||
LOG.debug("onWebSocketConnect({})",sess);
|
||||
this.session = sess;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
|
||||
/**
|
||||
* On Connect, close socket
|
||||
*/
|
||||
public class FastCloseEndpoint extends AbstractCloseEndpoint
|
||||
{
|
||||
@Override
|
||||
public void onWebSocketConnect(Session sess)
|
||||
{
|
||||
LOG.debug("onWebSocketConnect({})", sess);
|
||||
sess.close(StatusCode.NORMAL, "FastCloseServer");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
|
||||
/**
|
||||
* On Connect, throw unhandled exception
|
||||
*/
|
||||
public class FastFailEndpoint extends AbstractCloseEndpoint
|
||||
{
|
||||
@Override
|
||||
public void onWebSocketConnect(Session sess)
|
||||
{
|
||||
LOG.debug("onWebSocketConnect({})",sess);
|
||||
// Test failure due to unhandled exception
|
||||
// this should trigger a fast-fail closure during open/connect
|
||||
throw new RuntimeException("Intentional FastFail");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import javax.servlet.ServletContext;
|
||||
|
||||
import org.eclipse.jetty.websocket.common.WebSocketContainer;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.eclipse.jetty.websocket.tests.EchoSocket;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
public class ServerCloseCreator implements WebSocketCreator
|
||||
{
|
||||
private final WebSocketServletFactory serverFactory;
|
||||
private LinkedBlockingQueue<AbstractCloseEndpoint> createdSocketQueue = new LinkedBlockingQueue<>();
|
||||
|
||||
public ServerCloseCreator(WebSocketServletFactory serverFactory)
|
||||
{
|
||||
this.serverFactory = serverFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
|
||||
{
|
||||
AbstractCloseEndpoint closeSocket = null;
|
||||
|
||||
if (req.hasSubProtocol("fastclose"))
|
||||
{
|
||||
closeSocket = new FastCloseEndpoint();
|
||||
resp.setAcceptedSubProtocol("fastclose");
|
||||
}
|
||||
else if (req.hasSubProtocol("fastfail"))
|
||||
{
|
||||
closeSocket = new FastFailEndpoint();
|
||||
resp.setAcceptedSubProtocol("fastfail");
|
||||
}
|
||||
else if (req.hasSubProtocol("container"))
|
||||
{
|
||||
ServletContext context = req.getHttpServletRequest().getServletContext();
|
||||
WebSocketContainer container =
|
||||
(WebSocketContainer) context.getAttribute(WebSocketContainer.class.getName());
|
||||
closeSocket = new ContainerEndpoint(container);
|
||||
resp.setAcceptedSubProtocol("container");
|
||||
}
|
||||
|
||||
if (closeSocket != null)
|
||||
{
|
||||
createdSocketQueue.offer(closeSocket);
|
||||
return closeSocket;
|
||||
}
|
||||
else
|
||||
{
|
||||
return new EchoSocket();
|
||||
}
|
||||
}
|
||||
|
||||
public AbstractCloseEndpoint pollLastCreated() throws InterruptedException
|
||||
{
|
||||
return createdSocketQueue.poll(5, SECONDS);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,280 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.util.WSURI;
|
||||
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSessionImpl;
|
||||
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* Tests various close scenarios
|
||||
*/
|
||||
@Disabled("Needs triage")
|
||||
public class ServerCloseTest
|
||||
{
|
||||
private WebSocketClient client;
|
||||
private Server server;
|
||||
private ServerCloseCreator serverEndpointCreator;
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
context.setContextPath("/");
|
||||
|
||||
ServletHolder closeEndpoint = new ServletHolder(new WebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.setIdleTimeout(Duration.ofSeconds(2));
|
||||
serverEndpointCreator = new ServerCloseCreator(factory);
|
||||
factory.setCreator(serverEndpointCreator);
|
||||
}
|
||||
});
|
||||
context.addServlet(closeEndpoint, "/ws");
|
||||
JettyWebSocketServletContainerInitializer.configure(context);
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
handlers.addHandler(context);
|
||||
handlers.addHandler(new DefaultHandler());
|
||||
|
||||
server.setHandler(handlers);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void startClient() throws Exception
|
||||
{
|
||||
client = new WebSocketClient();
|
||||
client.setIdleTimeout(Duration.ofSeconds(2));
|
||||
client.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopClient() throws Exception
|
||||
{
|
||||
client.stop();
|
||||
}
|
||||
|
||||
private void close(Session session)
|
||||
{
|
||||
if (session != null)
|
||||
{
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fast close (bug #403817)
|
||||
*
|
||||
* @throws Exception on test failure
|
||||
*/
|
||||
@Test
|
||||
public void fastClose() throws Exception
|
||||
{
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setSubProtocols("fastclose");
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> futSession = client.connect(clientEndpoint, wsUri, request);
|
||||
|
||||
Session session = null;
|
||||
try
|
||||
{
|
||||
session = futSession.get(5, SECONDS);
|
||||
|
||||
// Verify that client got close
|
||||
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.NORMAL), containsString(""));
|
||||
|
||||
// Verify that server socket got close event
|
||||
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
|
||||
assertThat("Fast Close Latch", serverEndpoint.closeLatch.await(5, SECONDS), is(true));
|
||||
assertThat("Fast Close.statusCode", serverEndpoint.closeStatusCode, is(StatusCode.ABNORMAL));
|
||||
}
|
||||
finally
|
||||
{
|
||||
close(session);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fast fail (bug #410537)
|
||||
*
|
||||
* @throws Exception on test failure
|
||||
*/
|
||||
@Test
|
||||
public void fastFail() throws Exception
|
||||
{
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setSubProtocols("fastfail");
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> futSession = client.connect(clientEndpoint, wsUri, request);
|
||||
|
||||
Session session = null;
|
||||
try(StacklessLogging ignore = new StacklessLogging(FastFailEndpoint.class, WebSocketSessionImpl.class))
|
||||
{
|
||||
session = futSession.get(5, SECONDS);
|
||||
|
||||
// Verify that client got close indicating SERVER_ERROR
|
||||
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SERVER_ERROR), containsString("Intentional FastFail"));
|
||||
|
||||
// Verify that server socket got close event
|
||||
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
|
||||
serverEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.SERVER_ERROR), containsString("Intentional FastFail"));
|
||||
|
||||
// Validate errors (must be "java.lang.RuntimeException: Intentional Exception from onWebSocketConnect")
|
||||
assertThat("socket.onErrors", serverEndpoint.errors.size(), greaterThanOrEqualTo(1));
|
||||
Throwable cause = serverEndpoint.errors.poll(5, SECONDS);
|
||||
assertThat("Error type", cause, instanceOf(RuntimeException.class));
|
||||
// ... with optional ClosedChannelException
|
||||
cause = serverEndpoint.errors.peek();
|
||||
if (cause != null)
|
||||
{
|
||||
assertThat("Error type", cause, instanceOf(ClosedChannelException.class));
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
close(session);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dropConnection() throws Exception
|
||||
{
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setSubProtocols("container");
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> futSession = client.connect(clientEndpoint, wsUri, request);
|
||||
|
||||
Session session = null;
|
||||
try(StacklessLogging ignore = new StacklessLogging(WebSocketSessionImpl.class))
|
||||
{
|
||||
session = futSession.get(5, SECONDS);
|
||||
|
||||
// Cause a client endpoint failure
|
||||
clientEndpoint.getEndPoint().close();
|
||||
|
||||
// Verify that client got close
|
||||
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.ABNORMAL), containsString("Disconnected"));
|
||||
|
||||
// Verify that server socket got close event
|
||||
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
|
||||
serverEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.ABNORMAL), containsString("Disconnected"));
|
||||
} finally
|
||||
{
|
||||
close(session);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test session open session cleanup (bug #474936)
|
||||
*
|
||||
* @throws Exception on test failure
|
||||
*/
|
||||
@Test
|
||||
public void testOpenSessionCleanup() throws Exception
|
||||
{
|
||||
fastFail();
|
||||
fastClose();
|
||||
dropConnection();
|
||||
|
||||
ClientUpgradeRequest request = new ClientUpgradeRequest();
|
||||
request.setSubProtocols("container");
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> futSession = client.connect(clientEndpoint, wsUri, request);
|
||||
|
||||
Session session = null;
|
||||
try(StacklessLogging ignore = new StacklessLogging(WebSocketSessionImpl.class))
|
||||
{
|
||||
session = futSession.get(5, SECONDS);
|
||||
|
||||
session.getRemote().sendString("openSessions");
|
||||
|
||||
String msg = clientEndpoint.messageQueue.poll(5, SECONDS);
|
||||
|
||||
assertThat("Should only have 1 open session", msg, containsString("openSessions.size=1\n"));
|
||||
|
||||
// Verify that client got close
|
||||
clientEndpoint.assertReceivedCloseEvent(5000, is(StatusCode.NORMAL), containsString("ContainerEndpoint"));
|
||||
|
||||
// Verify that server socket got close event
|
||||
AbstractCloseEndpoint serverEndpoint = serverEndpointCreator.pollLastCreated();
|
||||
assertThat("Server Open Sessions Latch", serverEndpoint.closeLatch.await(5, SECONDS), is(true));
|
||||
assertThat("Server Open Sessions.statusCode", serverEndpoint.closeStatusCode, is(StatusCode.NORMAL));
|
||||
assertThat("Server Open Sessions.errors", serverEndpoint.errors.size(), is(0));
|
||||
}
|
||||
finally
|
||||
{
|
||||
close(session);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
|
||||
@WebSocket
|
||||
public class SlowServerEndpoint
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(SlowServerEndpoint.class);
|
||||
|
||||
@OnWebSocketMessage
|
||||
public void onMessage(Session session, String msg)
|
||||
{
|
||||
ThreadLocalRandom random = ThreadLocalRandom.current();
|
||||
|
||||
if (msg.startsWith("send-slow|"))
|
||||
{
|
||||
int idx = msg.indexOf('|');
|
||||
int msgCount = Integer.parseInt(msg.substring(idx + 1));
|
||||
CompletableFuture.runAsync(() ->
|
||||
{
|
||||
for (int i = 0; i < msgCount; i++)
|
||||
{
|
||||
try
|
||||
{
|
||||
session.getRemote().sendString("Hello/" + i + "/");
|
||||
// fake some slowness
|
||||
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
|
||||
}
|
||||
catch (Throwable cause)
|
||||
{
|
||||
LOG.warn(cause);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
// echo message.
|
||||
try
|
||||
{
|
||||
session.getRemote().sendString(msg);
|
||||
}
|
||||
catch (IOException ignore)
|
||||
{
|
||||
LOG.ignore(ignore);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests.server;
|
||||
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.DefaultHandler;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.util.WSURI;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.eclipse.jetty.websocket.tests.CloseTrackingEndpoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
/**
|
||||
* This Regression Test Exists because of Server side Idle timeout, Write, and Generator bugs.
|
||||
*/
|
||||
public class SlowServerTest
|
||||
{
|
||||
private Server server;
|
||||
private WebSocketClient client;
|
||||
|
||||
@BeforeEach
|
||||
public void startClient() throws Exception
|
||||
{
|
||||
client = new WebSocketClient();
|
||||
client.setIdleTimeout(Duration.ofSeconds(60));
|
||||
client.start();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
|
||||
ServerConnector connector = new ServerConnector(server);
|
||||
connector.setPort(0);
|
||||
server.addConnector(connector);
|
||||
|
||||
ServletContextHandler context = new ServletContextHandler();
|
||||
context.setContextPath("/");
|
||||
|
||||
ServletHolder websocket = new ServletHolder(new WebSocketServlet()
|
||||
{
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory)
|
||||
{
|
||||
factory.register(SlowServerEndpoint.class);
|
||||
}
|
||||
});
|
||||
context.addServlet(websocket, "/ws");
|
||||
JettyWebSocketServletContainerInitializer.configure(context);
|
||||
|
||||
HandlerList handlers = new HandlerList();
|
||||
handlers.addHandler(context);
|
||||
handlers.addHandler(new DefaultHandler());
|
||||
|
||||
server.setHandler(handlers);
|
||||
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopClient() throws Exception
|
||||
{
|
||||
client.stop();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void stopServer() throws Exception
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerSlowToSend() throws Exception
|
||||
{
|
||||
CloseTrackingEndpoint clientEndpoint = new CloseTrackingEndpoint();
|
||||
client.setIdleTimeout(Duration.ofSeconds(60));
|
||||
|
||||
URI wsUri = WSURI.toWebsocket(server.getURI().resolve("/ws"));
|
||||
Future<Session> future = client.connect(clientEndpoint, wsUri);
|
||||
|
||||
Session session = null;
|
||||
try
|
||||
{
|
||||
// Confirm connected
|
||||
session = future.get(5, SECONDS);
|
||||
|
||||
int messageCount = 10;
|
||||
|
||||
session.getRemote().sendString("send-slow|" + messageCount);
|
||||
|
||||
// Verify receive
|
||||
LinkedBlockingQueue<String> responses = clientEndpoint.messageQueue;
|
||||
|
||||
for (int i = 0; i < messageCount; i++)
|
||||
{
|
||||
String response = responses.poll(5, SECONDS);
|
||||
assertThat("Server Message[" + i + "]", response, is("Hello/" + i + "/"));
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (session != null)
|
||||
{
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
# ========================================================================
|
||||
#
|
||||
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
|
||||
org.eclipse.jetty.LEVEL=INFO
|
||||
org.eclipse.jetty.LEVEL=WARN
|
||||
# org.eclipse.jetty.io.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.core.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.util.log.stderr.LONG=true
|
||||
|
|
Loading…
Reference in New Issue