Merge remote-tracking branch 'origin/jetty-9.4.x-4824-WSmaxOutgoingFrames' into jetty-10.0.x

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-09-10 07:45:42 +10:00
commit b7fb631b2d
9 changed files with 296 additions and 7 deletions

View File

@ -74,6 +74,28 @@ public interface Configuration
void setMaxTextMessageSize(long maxSize);
/**
* Get the maximum number of data frames allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation. If the limit is exceeded, subsequent frames
* sent are failed with a {@link java.nio.channels.WritePendingException} but
* the connection is not failed and will remain open.
*
* @return the max number of frames.
*/
int getMaxOutgoingFrames();
/**
* Set the maximum number of data frames allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation. If the limit is exceeded, subsequent frames
* sent are failed with a {@link java.nio.channels.WritePendingException} but
* the connection is not failed and will remain open.
*
* @param maxOutgoingFrames the max number of frames.
*/
void setMaxOutgoingFrames(int maxOutgoingFrames);
interface Customizer
{
void customize(Configuration configurable);
@ -89,6 +111,7 @@ public interface Configuration
private Integer inputBufferSize;
private Long maxBinaryMessageSize;
private Long maxTextMessageSize;
private Integer maxOutgoingFrames;
@Override
public Duration getIdleTimeout()
@ -186,6 +209,18 @@ public interface Configuration
this.maxTextMessageSize = maxTextMessageSize;
}
@Override
public int getMaxOutgoingFrames()
{
return maxOutgoingFrames == null ? WebSocketConstants.DEFAULT_MAX_OUTGOING_FRAMES : maxOutgoingFrames;
}
@Override
public void setMaxOutgoingFrames(int maxOutgoingFrames)
{
this.maxOutgoingFrames = maxOutgoingFrames;
}
@Override
public void customize(Configuration configurable)
{
@ -205,6 +240,8 @@ public interface Configuration
configurable.setMaxBinaryMessageSize(maxBinaryMessageSize);
if (maxTextMessageSize != null)
configurable.setMaxTextMessageSize(maxTextMessageSize);
if (maxOutgoingFrames != null)
configurable.setMaxOutgoingFrames(maxOutgoingFrames);
}
}
}

View File

@ -32,6 +32,7 @@ public final class WebSocketConstants
public static final int DEFAULT_MAX_FRAME_SIZE = 64 * 1024;
public static final int DEFAULT_INPUT_BUFFER_SIZE = 4 * 1024;
public static final int DEFAULT_OUTPUT_BUFFER_SIZE = 4 * 1024;
public static final int DEFAULT_MAX_OUTGOING_FRAMES = -1;
public static final boolean DEFAULT_AUTO_FRAGMENT = true;
public static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(30);
public static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ZERO;
@ -41,7 +42,6 @@ public final class WebSocketConstants
* <p>
* See <a href="https://tools.ietf.org/html/rfc6455#section-1.3">Opening Handshake (Section 1.3)</a>
*/
@SuppressWarnings("SpellCheckingInspection")
public static final byte[] MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StandardCharsets.ISO_8859_1);
private WebSocketConstants()

View File

@ -22,11 +22,13 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.channels.WritePendingException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
@ -71,6 +73,9 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
private final boolean demanding;
private final Flusher flusher = new Flusher(this);
private int maxOutgoingFrames = -1;
private final AtomicInteger numOutgoingFrames = new AtomicInteger();
private WebSocketConnection connection;
private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT;
private long maxFrameSize = WebSocketConstants.DEFAULT_MAX_FRAME_SIZE;
@ -522,6 +527,17 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
if (maxOutgoingFrames > 0 && frame.isDataFrame())
{
// Increase the number of outgoing frames, will be decremented when callback is completed.
callback = Callback.from(callback, numOutgoingFrames::decrementAndGet);
if (numOutgoingFrames.incrementAndGet() > maxOutgoingFrames)
{
callback.failed(new WritePendingException());
return;
}
}
try
{
assertValidOutgoing(frame);
@ -543,9 +559,10 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
boolean closeConnection = sessionState.onOutgoingFrame(frame);
if (closeConnection)
{
Callback c = callback;
Callback closeConnectionCallback = Callback.from(
() -> closeConnection(sessionState.getCloseStatus(), callback),
t -> closeConnection(sessionState.getCloseStatus(), Callback.from(callback, t)));
() -> closeConnection(sessionState.getCloseStatus(), c),
t -> closeConnection(sessionState.getCloseStatus(), Callback.from(c, t)));
flusher.sendFrame(frame, closeConnectionCallback, false);
}
@ -662,6 +679,18 @@ public class WebSocketCoreSession implements IncomingFrames, CoreSession, Dumpab
maxTextMessageSize = maxSize;
}
@Override
public int getMaxOutgoingFrames()
{
return maxOutgoingFrames;
}
@Override
public void setMaxOutgoingFrames(int maxOutgoingFrames)
{
this.maxOutgoingFrames = maxOutgoingFrames;
}
private class IncomingAdaptor implements IncomingFrames
{
@Override

View File

@ -122,7 +122,7 @@ public abstract class Negotiation
? Collections.emptyList()
: extensions.getValues().stream()
.map(ExtensionConfig::parse)
.filter(ec -> available.contains(ec.getName().toLowerCase()) && !ec.getName().startsWith("@"))
.filter(ec -> available.contains(ec.getName()) && !ec.getName().startsWith("@"))
.collect(Collectors.toList());
// Remove any parameters starting with "@", these are not to be negotiated by client (internal parameters).

View File

@ -163,6 +163,28 @@ public interface RemoteEndpoint
*/
void setBatchMode(BatchMode mode);
/**
* Get the maximum number of frames which allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation. If the limit is exceeded, subsequent frames
* sent are failed with a {@link java.nio.channels.WritePendingException} but
* the connection is not failed and will remain open.
*
* @return the max number of frames.
*/
int getMaxOutgoingFrames();
/**
* Set the maximum number of frames which allowed to be waiting to be sent at any one time.
* The default value is -1, this indicates there is no limit on how many frames can be
* queued to be sent by the implementation. If the limit is exceeded, subsequent frames
* sent are failed with a {@link java.nio.channels.WritePendingException} but
* the connection is not failed and will remain open.
*
* @param maxOutgoingFrames the max number of frames.
*/
void setMaxOutgoingFrames(int maxOutgoingFrames);
/**
* Get the SocketAddress for the established connection.
*

View File

@ -18,13 +18,17 @@
package org.eclipse.jetty.websocket.client.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -69,7 +73,9 @@ public class DelegatedJettyClientUpgradeResponse implements UpgradeResponse
@Override
public Map<String, List<String>> getHeaders()
{
return null;
Map<String, List<String>> headers = getHeaderNames().stream()
.collect(Collectors.toMap((name) -> name, (name) -> new ArrayList<>(getHeaders(name))));
return Collections.unmodifiableMap(headers);
}
@Override

View File

@ -240,6 +240,18 @@ public class JettyWebSocketRemoteEndpoint implements org.eclipse.jetty.websocket
batchMode = mode;
}
@Override
public int getMaxOutgoingFrames()
{
return coreSession.getMaxOutgoingFrames();
}
@Override
public void setMaxOutgoingFrames(int maxOutgoingFrames)
{
coreSession.setMaxOutgoingFrames(maxOutgoingFrames);
}
private boolean isBatch()
{
return BatchMode.ON == batchMode;

View File

@ -0,0 +1,184 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.nio.channels.WritePendingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.core.AbstractExtension;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.server.WebSocketServerComponents;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.tests.util.FutureWriteCallback;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MaxOutgoingFramesTest
{
public static CountDownLatch outgoingBlocked;
public static CountDownLatch firstFrameBlocked;
private final EventSocket serverSocket = new EventSocket();
private Server server;
private ServerConnector connector;
private WebSocketClient client;
@BeforeEach
public void start() throws Exception
{
outgoingBlocked = new CountDownLatch(1);
firstFrameBlocked = new CountDownLatch(1);
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
contextHandler.setContextPath("/");
JettyWebSocketServletContainerInitializer.configure(contextHandler, (context, container) ->
{
container.addMapping("/", (req, resp) -> serverSocket);
WebSocketComponents components = WebSocketServerComponents.ensureWebSocketComponents(context);
components.getExtensionRegistry().register(BlockingOutgoingExtension.class.getName(), BlockingOutgoingExtension.class);
});
server.setHandler(contextHandler);
client = new WebSocketClient();
server.start();
client.start();
}
@AfterEach
public void stop() throws Exception
{
outgoingBlocked.countDown();
server.stop();
client.stop();
}
public static class BlockingOutgoingExtension extends AbstractExtension
{
@Override
public String getName()
{
return BlockingOutgoingExtension.class.getName();
}
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
try
{
firstFrameBlocked.countDown();
outgoingBlocked.await();
super.sendFrame(frame, callback, batch);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
}
public static class CountingCallback implements WriteCallback
{
private final CountDownLatch successes;
public CountingCallback(int count)
{
successes = new CountDownLatch(count);
}
@Override
public void writeSuccess()
{
successes.countDown();
}
@Override
public void writeFailed(Throwable t)
{
t.printStackTrace();
}
}
@Test
public void testMaxOutgoingFrames() throws Exception
{
// We need to have the frames queued but not yet sent, we do this by blocking in the ExtensionStack.
WebSocketCoreClient coreClient = client.getBean(WebSocketCoreClient.class);
coreClient.getExtensionRegistry().register(BlockingOutgoingExtension.class.getName(), BlockingOutgoingExtension.class);
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/");
EventSocket socket = new EventSocket();
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.addExtensions(BlockingOutgoingExtension.class.getName());
client.connect(socket, uri, upgradeRequest).get(5, TimeUnit.SECONDS);
assertTrue(socket.openLatch.await(5, TimeUnit.SECONDS));
int numFrames = 30;
RemoteEndpoint remote = socket.session.getRemote();
remote.setMaxOutgoingFrames(numFrames);
// Verify that we can send up to numFrames without any problem.
// First send will block in the Extension so it needs to be done in new thread, others frames will be queued.
CountingCallback countingCallback = new CountingCallback(numFrames);
new Thread(() -> remote.sendString("0", countingCallback)).start();
assertTrue(firstFrameBlocked.await(5, TimeUnit.SECONDS));
for (int i = 1; i < numFrames; i++)
{
remote.sendString(Integer.toString(i), countingCallback);
}
// Sending any more frames will result in WritePendingException.
FutureWriteCallback callback = new FutureWriteCallback();
remote.sendString("fail", callback);
ExecutionException executionException = assertThrows(ExecutionException.class, () -> callback.get(5, TimeUnit.SECONDS));
assertThat(executionException.getCause(), instanceOf(WritePendingException.class));
// Check that all callbacks are succeeded when the server processes the frames.
outgoingBlocked.countDown();
assertTrue(countingCallback.successes.await(5, TimeUnit.SECONDS));
// Close successfully.
socket.session.close();
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(socket.closeLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -128,8 +128,7 @@ public class ServerUpgradeResponse
public Map<String, List<String>> getHeadersMap()
{
Map<String, List<String>> headers = response.getHeaderNames().stream()
.collect(Collectors.toMap((name) -> name,
(name) -> new ArrayList<>(response.getHeaders(name))));
.collect(Collectors.toMap((name) -> name, (name) -> new ArrayList<>(response.getHeaders(name))));
return Collections.unmodifiableMap(headers);
}