diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java index 3f977980dca..a2e0e665f57 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.util; -import java.io.File; import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.Constructor; @@ -33,6 +32,7 @@ import java.security.ProtectionDomain; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -732,4 +732,22 @@ public class TypeUtil } return null; } + + public static Iterator concat(Iterator i1, Iterator i2) + { + return new Iterator<>() + { + @Override + public boolean hasNext() + { + return i1.hasNext() || i2.hasNext(); + } + + @Override + public T next() + { + return i1.hasNext() ? i1.next() : i2.next(); + } + }; + } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java index 12b51ff5186..aa40f553f17 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java @@ -409,7 +409,6 @@ public interface FrameHandler extends IncomingFrames @Override public void setWriteIdleTimeout(Duration timeout) { - } @Override diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/WebSocketWriteTimeoutException.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/WebSocketWriteTimeoutException.java index a3ae16ecf81..2dabed13133 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/WebSocketWriteTimeoutException.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/WebSocketWriteTimeoutException.java @@ -1,3 +1,21 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + package org.eclipse.jetty.websocket.core; public class WebSocketWriteTimeoutException extends WebSocketTimeoutException diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java index b6f9c55dfe8..9ffb99b88b9 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java @@ -53,6 +53,7 @@ import org.eclipse.jetty.util.QuotedStringTokenizer; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.ExtensionConfig; import org.eclipse.jetty.websocket.core.FrameHandler; @@ -349,7 +350,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon WebSocketChannel wsChannel = newWebSocketChannel(frameHandler, negotiated); wsClient.customize(wsChannel); - WebSocketConnection wsConnection = newWebSocketConnection(endp, httpClient.getExecutor(), httpClient.getByteBufferPool(), wsChannel); + WebSocketConnection wsConnection = newWebSocketConnection(endp, httpClient.getExecutor(), httpClient.getScheduler(), httpClient.getByteBufferPool(), wsChannel); for (Connection.Listener listener : wsClient.getBeans(Connection.Listener.class)) wsConnection.addListener(listener); @@ -379,9 +380,9 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon { } - protected WebSocketConnection newWebSocketConnection(EndPoint endp, Executor executor, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel) + protected WebSocketConnection newWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel) { - return new WebSocketConnection(endp, executor, byteBufferPool, wsChannel); + return new WebSocketConnection(endp, executor, scheduler, byteBufferPool, wsChannel); } protected WebSocketChannel newWebSocketChannel(FrameHandler handler, Negotiated negotiated) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index ed8fa4b0e92..d953f222701 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -23,20 +23,20 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.CyclicTimeout; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.OpCode; @@ -54,9 +54,12 @@ public class FrameFlusher extends IteratingCallback private final Generator generator; private final int maxGather; private final Deque queue = new ArrayDeque<>(); - private final List entries; private final List buffers; - private final Timeout timeout; + private final Scheduler timeoutScheduler; + private final List entries; + private final List previousEntries; + private final List failedEntries; + private ByteBuffer batchBuffer = null; private boolean canEnqueue = true; private Throwable closedCause; @@ -64,7 +67,7 @@ public class FrameFlusher extends IteratingCallback private LongAdder bytesOut = new LongAdder(); private long idleTimeout = 0; - public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) + public FrameFlusher(ByteBufferPool bufferPool, Scheduler scheduler, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) { this.bufferPool = bufferPool; this.endPoint = endPoint; @@ -72,18 +75,10 @@ public class FrameFlusher extends IteratingCallback this.generator = Objects.requireNonNull(generator); this.maxGather = maxGather; this.entries = new ArrayList<>(maxGather); + this.previousEntries = new ArrayList<>(maxGather); + this.failedEntries = new ArrayList<>(maxGather); this.buffers = new ArrayList<>((maxGather * 2) + 1); - - ScheduledExecutorScheduler executor = new ScheduledExecutorScheduler(); - try - { - executor.start(); - } - catch (Exception e) - { - e.printStackTrace(); - } - this.timeout = new Timeout(executor); + this.timeoutScheduler = scheduler; } /** @@ -113,8 +108,8 @@ public class FrameFlusher extends IteratingCallback else queue.offerLast(entry); - if (idleTimeout > 0) - timeout.schedule(entry); + if ((idleTimeout > 0) && (queue.size()==1) && entries.isEmpty()) + timeoutScheduler.schedule(this::timeoutExpired, idleTimeout, TimeUnit.MILLISECONDS); if (opCode == OpCode.CLOSE) this.canEnqueue = false; @@ -160,10 +155,9 @@ public class FrameFlusher extends IteratingCallback if (closedCause != null) throw closedCause; - // Succeed entries from previous call to process - // and clear batchBuffer if we wrote it. - if (succeedEntries() && batchBuffer != null) - BufferUtil.clear(batchBuffer); + // Remember entries to succeed from previous process + previousEntries.addAll(entries); + entries.clear(); while (!queue.isEmpty() && entries.size() <= maxGather) { @@ -233,10 +227,20 @@ public class FrameFlusher extends IteratingCallback BufferUtil.toDetailString(batchBuffer), entries); + // succeed previous entries + for (Entry entry : previousEntries) + { + if (entry.frame.getOpCode() == OpCode.CLOSE) + endPoint.shutdownOutput(); + notifyCallbackSuccess(entry.callback); + entry.release(); + } + previousEntries.clear(); + + // If we did not get any new entries go to IDLE state if (entries.isEmpty()) { releaseAggregate(); - succeedEntries(); return Action.IDLE; } @@ -272,27 +276,47 @@ public class FrameFlusher extends IteratingCallback } } - @Override - public void succeeded() + public void timeoutExpired() { - succeedEntries(); - super.succeeded(); - } - - private boolean succeedEntries() - { - // todo: synchronize? - boolean hadEntries = false; - for (Entry entry : entries) + boolean failed = false; + synchronized (FrameFlusher.this) { - hadEntries = true; - if (entry.frame.getOpCode() == OpCode.CLOSE) - endPoint.shutdownOutput(); - notifyCallbackSuccess(entry.callback); - entry.release(); + if (closedCause != null) + return; + + long time = System.currentTimeMillis(); + long nextTimeout = -1; + + Iterator iterator = TypeUtil.concat(entries.iterator(), queue.iterator()); + + while (iterator.hasNext()) + { + Entry entry = iterator.next(); + long timeSinceCreation = time - entry.getTimeOfCreation(); + + if (timeSinceCreation >= idleTimeout) + { + LOG.warn("FrameFlusher write timeout on entry: {}", entry); + failed = true; + canEnqueue = false; + closedCause = new WebSocketWriteTimeoutException("FrameFlusher Write Timeout"); + failedEntries.addAll(entries); + failedEntries.addAll(queue); + entries.clear(); + queue.clear(); + break; + } + + if (timeSinceCreation > nextTimeout) + nextTimeout = timeSinceCreation; + } + + if (!failed && idleTimeout>0 && !entries.isEmpty()) + timeoutScheduler.schedule(this::timeoutExpired, nextTimeout, TimeUnit.MILLISECONDS); } - entries.clear(); - return hadEntries; + + if (failed) + this.iterate(); } @Override @@ -302,22 +326,26 @@ public class FrameFlusher extends IteratingCallback releaseAggregate(); synchronized (this) { - entries.addAll(queue); + failedEntries.addAll(queue); queue.clear(); + failedEntries.addAll(entries); + entries.clear(); + if (closedCause == null) closedCause = failure; else if (closedCause != failure) closedCause.addSuppressed(failure); } - for (Entry entry : entries) + for (Entry entry : failedEntries) { notifyCallbackFailure(entry.callback, failure); entry.release(); } - entries.clear(); - endPoint.close(failure); + + failedEntries.clear(); + endPoint.close(closedCause); } private void releaseAggregate() @@ -432,56 +460,4 @@ public class FrameFlusher extends IteratingCallback return String.format("%s{%s,%s,%b}", getClass().getSimpleName(), frame, callback, batch); } } - - public class Timeout extends CyclicTimeout - { - /** - * @param scheduler A scheduler used to schedule wakeups - */ - public Timeout(Scheduler scheduler) - { - super(scheduler); - } - - @Override - public void onTimeoutExpired() - { - LOG.warn("idle timeout expired"); - - synchronized (FrameFlusher.this) - { - long time = System.currentTimeMillis(); - long nextTimeout = -1; - - for (Entry e : entries) - { - long timeSinceCreation = time - e.getTimeOfCreation(); - - if (timeSinceCreation > idleTimeout) - { - LOG.warn("FrameFlusher write timeout on entry: {}", e); - // TODO: can we fail more gracefully, this exception must be tested in processConnectionError to avoid trying to write again - // TODO: is failed() good enough, what happens if the iterating callback goes into state - failed(new WebSocketWriteTimeoutException("FrameFlusher Write Timeout")); - return; - } - - if (timeSinceCreation > nextTimeout) - nextTimeout = timeSinceCreation; - } - - if (!entries.isEmpty()) - schedule(nextTimeout, TimeUnit.MILLISECONDS); - } - } - - public void schedule(Entry entry) - { - synchronized (FrameFlusher.this) - { - if (entries.isEmpty()) - schedule(idleTimeout, TimeUnit.MILLISECONDS); - } - } - } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index 0c940ee04a2..7ed7d3ba9a5 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -36,6 +36,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.MessageTooLargeException; @@ -79,10 +80,11 @@ public class WebSocketConnection extends AbstractConnection implements Connectio */ public WebSocketConnection(EndPoint endp, Executor executor, + Scheduler scheduler, ByteBufferPool bufferPool, WebSocketChannel channel) { - this(endp, executor, bufferPool, channel, true); + this(endp, executor, scheduler, bufferPool, channel, true); } /** @@ -94,6 +96,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio */ public WebSocketConnection(EndPoint endp, Executor executor, + Scheduler scheduler, ByteBufferPool bufferPool, WebSocketChannel channel, boolean validating) @@ -122,7 +125,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio }; - this.flusher = new Flusher(channel.getOutputBufferSize(), generator, endp); + this.flusher = new Flusher(scheduler, channel.getOutputBufferSize(), generator, endp); this.setInputBufferSize(channel.getInputBufferSize()); this.random = this.channel.getBehavior() == Behavior.CLIENT?new Random(endp.hashCode()):null; @@ -596,9 +599,9 @@ public class WebSocketConnection extends AbstractConnection implements Connectio private class Flusher extends FrameFlusher { - private Flusher(int bufferSize, Generator generator, EndPoint endpoint) + private Flusher(Scheduler scheduler, int bufferSize, Generator generator, EndPoint endpoint) { - super(bufferPool, generator, endpoint, bufferSize, 8); + super(bufferPool, scheduler, generator, endpoint, bufferSize, 8); } @Override diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java index d976112bb37..b51e86b0218 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java @@ -43,6 +43,7 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.ExtensionConfig; import org.eclipse.jetty.websocket.core.FrameHandler; @@ -201,7 +202,7 @@ public final class RFC6455Handshaker implements Handshaker LOG.debug("channel {}", channel); // Create a connection - WebSocketConnection connection = newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getByteBufferPool(), channel); + WebSocketConnection connection = newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), channel); if (LOG.isDebugEnabled()) LOG.debug("connection {}", connection); if (connection == null) @@ -241,9 +242,9 @@ public final class RFC6455Handshaker implements Handshaker return new WebSocketChannel(handler, Behavior.SERVER, negotiated); } - protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel) + protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketChannel wsChannel) { - return new WebSocketConnection(endPoint, executor, byteBufferPool, wsChannel); + return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, wsChannel); } private boolean getSendServerVersion(Connector connector) diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java index 2ae62f35efe..c751ccbddd4 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/internal/FrameFlusherTest.java @@ -37,11 +37,15 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.WebSocketConstants; import org.eclipse.jetty.websocket.core.WebSocketWriteTimeoutException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static java.nio.charset.StandardCharsets.UTF_8; @@ -53,7 +57,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class FrameFlusherTest { - public ByteBufferPool bufferPool = new MappedByteBufferPool(); + private ByteBufferPool bufferPool = new MappedByteBufferPool(); + private Scheduler scheduler; + + @BeforeEach + public void start() throws Exception + { + scheduler = new ScheduledExecutorScheduler(); + scheduler.start(); + } + + @AfterEach + public void stop() throws Exception + { + scheduler.stop(); + } /** * Ensure post-close frames have their associated callbacks properly notified. @@ -65,7 +83,7 @@ public class FrameFlusherTest CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool); int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE; int maxGather = 1; - FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather); + FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather); Frame closeFrame = new Frame(OpCode.CLOSE).setPayload(CloseStatus.asPayloadBuffer(CloseStatus.MESSAGE_TOO_LARGE, "Message be to big")); Frame textFrame = new Frame(OpCode.TEXT).setPayload("Hello").setFin(true); @@ -96,7 +114,7 @@ public class FrameFlusherTest CapturingEndPoint endPoint = new CapturingEndPoint(bufferPool); int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE; int maxGather = 8; - FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather); + FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather); int largeMessageSize = 60000; byte[] buf = new byte[largeMessageSize]; @@ -147,10 +165,9 @@ public class FrameFlusherTest int bufferSize = WebSocketConstants.DEFAULT_MAX_TEXT_MESSAGE_SIZE; int maxGather = 8; - CountDownLatch flusherFailure = new CountDownLatch(1); AtomicReference error = new AtomicReference<>(); - FrameFlusher frameFlusher = new FrameFlusher(bufferPool, generator, endPoint, bufferSize, maxGather) + FrameFlusher frameFlusher = new FrameFlusher(bufferPool, scheduler, generator, endPoint, bufferSize, maxGather) { @Override public void onCompleteFailure(Throwable failure) @@ -161,7 +178,7 @@ public class FrameFlusherTest } }; - frameFlusher.setIdleTimeout(150); + frameFlusher.setIdleTimeout(100); endPoint.setBlockTime(200); Frame frame = new Frame(OpCode.TEXT).setPayload("message").setFin(true);