diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 3fe68cc340d..5531f289740 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -86,7 +87,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler { HttpRequest request = (HttpRequest)result.getRequest(); ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding()); - if (result.isFailed()) + if (result.getResponseFailure() != null) { if (LOG.isDebugEnabled()) LOG.debug("Authentication challenge failed {}", result.getFailure()); @@ -98,7 +99,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler HttpConversation conversation = request.getConversation(); if (conversation.getAttribute(authenticationAttribute) != null) { - // We have already tried to authenticate, but we failed again + // We have already tried to authenticate, but we failed again. if (LOG.isDebugEnabled()) LOG.debug("Bad credentials for {}", request); forwardSuccessComplete(request, response); @@ -111,7 +112,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler { if (LOG.isDebugEnabled()) LOG.debug("Authentication challenge without {} header", header); - forwardFailureComplete(request, null, response, new HttpResponseException("HTTP protocol violation: Authentication challenge without " + header + " header", response)); + forwardFailureComplete(request, result.getRequestFailure(), response, new HttpResponseException("HTTP protocol violation: Authentication challenge without " + header + " header", response)); return; } @@ -138,9 +139,18 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler return; } + ContentProvider requestContent = request.getContent(); + if (requestContent != null && !requestContent.isReproducible()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Request content not reproducible for {}", request); + forwardSuccessComplete(request, response); + return; + } + try { - final Authentication.Result authnResult = authentication.authenticate(request, response, headerInfo, conversation); + Authentication.Result authnResult = authentication.authenticate(request, response, headerInfo, conversation); if (LOG.isDebugEnabled()) LOG.debug("Authentication result {}", authnResult); if (authnResult == null) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java index 03754f72c9b..6419b11e8f4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.nio.ByteBuffer; import java.util.Iterator; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.util.ByteBufferContentProvider; import org.eclipse.jetty.client.util.PathContentProvider; @@ -48,6 +49,22 @@ public interface ContentProvider extends Iterable */ long getLength(); + /** + *

Whether this ContentProvider can produce exactly the same content more + * than once.

+ *

Implementations should return {@code true} only if the content can be + * produced more than once, which means that invocations to {@link #iterator()} + * must return a new, independent, iterator instance over the content.

+ *

The {@link HttpClient} implementation may use this method in particular + * cases where it detects that it is safe to retry a request that failed.

+ * + * @return whether the content can be produced more than once + */ + default boolean isReproducible() + { + return false; + } + /** * An extension of {@link ContentProvider} that provides a content type string * to be used as a {@code Content-Type} HTTP header in requests. diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java index 68386758da9..2bc5233f8be 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java @@ -57,6 +57,12 @@ public class ByteBufferContentProvider extends AbstractTypedContentProvider return length; } + @Override + public boolean isReproducible() + { + return true; + } + @Override public Iterator iterator() { @@ -85,12 +91,6 @@ public class ByteBufferContentProvider extends AbstractTypedContentProvider throw new NoSuchElementException(); } } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } }; } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java index d2c080aa700..39f5fd5ad51 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java @@ -53,6 +53,12 @@ public class BytesContentProvider extends AbstractTypedContentProvider return length; } + @Override + public boolean isReproducible() + { + return true; + } + @Override public Iterator iterator() { @@ -78,12 +84,6 @@ public class BytesContentProvider extends AbstractTypedContentProvider throw new NoSuchElementException(); } } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } }; } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java index d1051d94136..00147a556df 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java @@ -85,6 +85,12 @@ public class PathContentProvider extends AbstractTypedContentProvider return fileSize; } + @Override + public boolean isReproducible() + { + return true; + } + public ByteBufferPool getByteBufferPool() { return bufferPool; diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java index b591cd7481f..4eea728f923 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.After; @@ -96,6 +97,7 @@ public abstract class AbstractHttpClientServerTest clientThreads.setName("client"); client = new HttpClient(transport, sslContextFactory); client.setExecutor(clientThreads); + client.setSocketAddressResolver(new SocketAddressResolver.Sync()); client.start(); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java index 6e1a5ca074d..14d23267228 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java @@ -22,11 +22,14 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntFunction; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -34,6 +37,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.AuthenticationStore; +import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -41,6 +45,7 @@ import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BasicAuthentication; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.DigestAuthentication; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.security.Authenticator; import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintSecurityHandler; @@ -220,7 +225,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest { baseRequest.setHandled(true); if (requests.incrementAndGet() == 1) - response.sendRedirect(URIUtil.newURI(scheme,request.getServerName(),request.getServerPort(),request.getRequestURI(),null)); + response.sendRedirect(URIUtil.newURI(scheme, request.getServerName(), request.getServerPort(), request.getRequestURI(), null)); } }); @@ -259,7 +264,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest { baseRequest.setHandled(true); if (request.getRequestURI().endsWith("/redirect")) - response.sendRedirect(URIUtil.newURI(scheme,request.getServerName(),request.getServerPort(),"/secure",null)); + response.sendRedirect(URIUtil.newURI(scheme, request.getServerName(), request.getServerPort(), "/secure", null)); } }); @@ -424,6 +429,40 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest Assert.assertEquals(1, requests.get()); } + @Test + public void test_NonReproducibleContent() throws Exception + { + startBasic(new EmptyServerHandler()); + + AuthenticationStore authenticationStore = client.getAuthenticationStore(); + URI uri = URI.create(scheme + "://localhost:" + connector.getLocalPort()); + BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic"); + authenticationStore.addAuthentication(authentication); + + CountDownLatch resultLatch = new CountDownLatch(1); + byte[] data = new byte[]{'h', 'e', 'l', 'l', 'o'}; + DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(data)) + { + @Override + public boolean isReproducible() + { + return false; + } + }; + Request request = client.newRequest(uri) + .path("/secure") + .content(content); + request.send(result -> + { + if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.UNAUTHORIZED_401) + resultLatch.countDown(); + }); + + content.close(); + + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + @Test public void test_RequestFailsAfterResponse() throws Exception { @@ -434,32 +473,111 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic"); authenticationStore.addAuthentication(authentication); - CountDownLatch successLatch = new CountDownLatch(1); + AtomicBoolean fail = new AtomicBoolean(true); + GeneratingContentProvider content = new GeneratingContentProvider(index -> + { + switch (index) + { + case 0: + return ByteBuffer.wrap(new byte[]{'h', 'e', 'l', 'l', 'o'}); + case 1: + return ByteBuffer.wrap(new byte[]{'w', 'o', 'r', 'l', 'd'}); + case 2: + if (fail.compareAndSet(true, false)) + { + // Wait for the 401 response to arrive + // to the authentication protocol handler. + sleep(1000); + // Trigger request failure. + throw new RuntimeException(); + } + else + { + return null; + } + default: + throw new IllegalStateException(); + } + }); CountDownLatch resultLatch = new CountDownLatch(1); - DeferredContentProvider content = new DeferredContentProvider(); - Request request = client.newRequest("localhost", connector.getLocalPort()) + client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .path("/secure") .content(content) - .onResponseSuccess(response -> successLatch.countDown()); - request.send(result -> - { - if (result.isFailed() && result.getResponseFailure() == null) - resultLatch.countDown(); - }); + .send(result -> + { + if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200) + resultLatch.countDown(); + }); - // Send some content to make sure the request is dispatched on the server. - content.offer(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))); - - // Wait for the response to arrive to - // the authentication protocol handler. - Thread.sleep(1000); - - // Trigger request failure. - request.abort(new Exception()); - - // Verify that the response was successful, it's the request that failed. - Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); } + + private void sleep(long time) + { + try + { + Thread.sleep(time); + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + + private static class GeneratingContentProvider implements ContentProvider + { + private static final ByteBuffer DONE = ByteBuffer.allocate(0); + + private final IntFunction generator; + + private GeneratingContentProvider(IntFunction generator) + { + this.generator = generator; + } + + @Override + public long getLength() + { + return -1; + } + + @Override + public boolean isReproducible() + { + return true; + } + + @Override + public Iterator iterator() + { + return new Iterator() + { + private int index; + public ByteBuffer current; + + @Override + public boolean hasNext() + { + if (current == null) + { + current = generator.apply(index++); + if (current == null) + current = DONE; + } + return current != DONE; + } + + @Override + public ByteBuffer next() + { + ByteBuffer result = current; + current = null; + if (result == null) + throw new NoSuchElementException(); + return result; + } + }; + } + } } diff --git a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc index d89a77bf391..49afbe47204 100644 --- a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc +++ b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc @@ -17,7 +17,7 @@ [[jetty-websocket-api-send-message]] === Send Messages to Remote Endpoint -The most important feature of the Session is access to the link:{JDURL}/org/eclipse/jetty/websocket/api/RemoteEndpoint.html[`org.eclipse.jetty.websocket.api.RemoteEndpoint`]needed to send messages. +The most important feature of the Session is access to the link:{JDURL}/org/eclipse/jetty/websocket/api/RemoteEndpoint.html[`org.eclipse.jetty.websocket.api.RemoteEndpoint`] needed to send messages. With RemoteEndpoint you can choose to send TEXT or BINARY WebSocket messages, or the WebSocket PING and PONG control frames. diff --git a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc index abcee12d5f5..754e0044c0e 100644 --- a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc +++ b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc @@ -44,7 +44,7 @@ What was in the Upgrade Request and Response. UpgradeRequest req = session.getUpgradeRequest(); String channelName = req.getParameterMap().get("channelName"); -UpgradeRespons resp = session.getUpgradeResponse(); +UpgradeResponse resp = session.getUpgradeResponse(); String subprotocol = resp.getAcceptedSubProtocol(); ---- diff --git a/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java b/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java index 7959f8a5072..766920ed9c3 100644 --- a/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java +++ b/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java @@ -121,6 +121,11 @@ public class HazelcastSessionDataStoreFactory return onlyClient; } + /** + * + * @param onlyClient if true the session manager will only connect to an external Hazelcast instance + * and not use this JVM to start an Hazelcast instance + */ public void setOnlyClient( boolean onlyClient ) { this.onlyClient = onlyClient; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 55572146c21..babbbf9fa84 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.http2.parser.Parser; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.LifeCycle; @@ -37,7 +38,7 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; -public class HTTP2Connection extends AbstractConnection +public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener { protected static final Logger LOG = Log.getLogger(HTTP2Connection.class); @@ -176,6 +177,13 @@ public class HTTP2Connection extends AbstractConnection } } + @Override + public void onFlushed(long bytes) throws IOException + { + // TODO: add method to ISession ? + ((HTTP2Session)session).onFlushed(bytes); + } + protected class HTTP2Producer implements ExecutionStrategy.Producer { private final Callback fillableCallback = new FillableCallback(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index b3bb0ccf97a..94452f24ccc 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -175,7 +176,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable { if (entry.generate(lease)) { - if (entry.dataRemaining() > 0) + if (entry.getDataBytesRemaining() > 0) entries.offer(entry); } else @@ -207,6 +208,31 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable return Action.SCHEDULED; } + void onFlushed(long bytes) throws IOException + { + // For the given flushed bytes, we want to only + // forward those that belong to data frame content. + for (Entry entry : actives) + { + int frameBytesLeft = entry.getFrameBytesRemaining(); + if (frameBytesLeft > 0) + { + int update = (int)Math.min(bytes, frameBytesLeft); + entry.onFrameBytesFlushed(update); + bytes -= update; + IStream stream = entry.stream; + if (stream != null && !entry.isControl()) + { + Object channel = stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); + if (channel instanceof WriteFlusher.Listener) + ((WriteFlusher.Listener)channel).onFlushed(update - Frame.HEADER_LENGTH); + } + if (bytes == 0) + break; + } + } + } + @Override public void succeeded() { @@ -234,13 +260,13 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable for (int i = index; i < actives.size(); ++i) { Entry entry = actives.get(i); - if (entry.dataRemaining() > 0) + if (entry.getDataBytesRemaining() > 0) append(entry); } for (int i = 0; i < index; ++i) { Entry entry = actives.get(i); - if (entry.dataRemaining() > 0) + if (entry.getDataBytesRemaining() > 0) append(entry); } stalled = null; @@ -333,7 +359,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable this.stream = stream; } - public int dataRemaining() + public abstract int getFrameBytesRemaining(); + + public abstract void onFrameBytesFlushed(int bytesFlushed); + + public int getDataBytesRemaining() { return 0; } @@ -387,6 +417,17 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable } } + private boolean isControl() + { + switch (frame.getType()) + { + case DATA: + return false; + default: + return true; + } + } + @Override public String toString() { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 32e7c136323..e8b1c317146 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -955,6 +955,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { } + void onFlushed(long bytes) throws IOException + { + flusher.onFlushed(bytes); + } + public void disconnect() { if (LOG.isDebugEnabled()) @@ -1132,15 +1137,28 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private class ControlEntry extends HTTP2Flusher.Entry { private int bytes; + private int frameBytes; private ControlEntry(Frame frame, IStream stream, Callback callback) { super(frame, stream, callback); } + @Override + public int getFrameBytesRemaining() + { + return frameBytes; + } + + @Override + public void onFrameBytesFlushed(int bytesFlushed) + { + frameBytes -= bytesFlushed; + } + protected boolean generate(ByteBufferPool.Lease lease) { - bytes = generator.control(lease, frame); + bytes = frameBytes = generator.control(lease, frame); if (LOG.isDebugEnabled()) LOG.debug("Generated {}", frame); prepare(); @@ -1238,7 +1256,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private class DataEntry extends HTTP2Flusher.Entry { private int bytes; - private int dataRemaining; + private int frameBytes; + private int dataBytes; private int dataWritten; private DataEntry(DataFrame frame, IStream stream, Callback callback) @@ -1249,35 +1268,47 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // of data frames that cannot be completely written due to // the flow control window exhausting, since in that case // we would have to count the padding only once. - dataRemaining = frame.remaining(); + dataBytes = frame.remaining(); } @Override - public int dataRemaining() + public int getFrameBytesRemaining() { - return dataRemaining; + return frameBytes; + } + + @Override + public void onFrameBytesFlushed(int bytesFlushed) + { + frameBytes -= bytesFlushed; + } + + @Override + public int getDataBytesRemaining() + { + return dataBytes; } protected boolean generate(ByteBufferPool.Lease lease) { - int dataRemaining = dataRemaining(); + int dataBytes = getDataBytesRemaining(); int sessionSendWindow = getSendWindow(); int streamSendWindow = stream.updateSendWindow(0); int window = Math.min(streamSendWindow, sessionSendWindow); - if (window <= 0 && dataRemaining > 0) + if (window <= 0 && dataBytes > 0) return false; - int length = Math.min(dataRemaining, window); + int length = Math.min(dataBytes, window); // Only one DATA frame is generated. - bytes = generator.data(lease, (DataFrame)frame, length); + bytes = frameBytes = generator.data(lease, (DataFrame)frame, length); int written = bytes - Frame.HEADER_LENGTH; if (LOG.isDebugEnabled()) - LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataRemaining); + LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes); this.dataWritten = written; - this.dataRemaining -= written; + this.dataBytes -= written; flowControl.onDataSending(stream, written); @@ -1292,7 +1323,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // Do we have more to send ? DataFrame dataFrame = (DataFrame)frame; - if (dataRemaining() == 0) + if (getDataBytesRemaining() == 0) { // Only now we can update the close state // and eventually remove the stream. diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java index 7815d9c946c..8bcc8f8a0f0 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java @@ -62,6 +62,11 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport }); } + public HTTP2Client getHTTP2Client() + { + return client; + } + @ManagedAttribute(value = "The number of selectors", readonly = true) public int getSelectors() { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 1a8168c53b5..484308bf033 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; @@ -48,7 +49,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable +public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, WriteFlusher.Listener { private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); @@ -85,6 +86,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable return getStream().getIdleTimeout(); } + @Override + public void onFlushed(long bytes) throws IOException + { + getResponse().getHttpOutput().onFlushed(bytes); + } + public Runnable onRequest(HeadersFrame frame) { try diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 7f7c6070051..81a2cf32e2b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; - /** * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling * {@link EndPoint#flush(ByteBuffer...)} until all content is written. @@ -60,7 +59,7 @@ abstract public class WriteFlusher // fill the state machine __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING)); __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); - __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE)); + __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE)); __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE)); } @@ -104,29 +103,30 @@ abstract public class WriteFlusher /** * Tries to update the current state to the given new state. + * * @param previous the expected current state - * @param next the desired new state + * @param next the desired new state * @return the previous state or null if the state transition failed * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error) */ - private boolean updateState(State previous,State next) + private boolean updateState(State previous, State next) { - if (!isTransitionAllowed(previous,next)) + if (!isTransitionAllowed(previous, next)) throw new IllegalStateException(); boolean updated = _state.compareAndSet(previous, next); if (DEBUG) - LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next); + LOG.debug("update {}:{}{}{}", this, previous, updated ? "-->" : "!->", next); return updated; } private void fail(PendingState pending) { State current = _state.get(); - if (current.getType()==StateType.FAILED) + if (current.getType() == StateType.FAILED) { - FailedState failed=(FailedState)current; - if (updateState(failed,__IDLE)) + FailedState failed = (FailedState)current; + if (updateState(failed, __IDLE)) { pending.fail(failed.getCause()); return; @@ -138,9 +138,9 @@ abstract public class WriteFlusher private void ignoreFail() { State current = _state.get(); - while (current.getType()==StateType.FAILED) + while (current.getType() == StateType.FAILED) { - if (updateState(current,__IDLE)) + if (updateState(current, __IDLE)) return; current = _state.get(); } @@ -209,10 +209,11 @@ abstract public class WriteFlusher private static class FailedState extends State { private final Throwable _cause; + private FailedState(Throwable cause) { super(StateType.FAILED); - _cause=cause; + _cause = cause; } public Throwable getCause() @@ -257,7 +258,7 @@ abstract public class WriteFlusher protected boolean fail(Throwable cause) { - if (_callback!=null) + if (_callback != null) { _callback.failed(cause); return true; @@ -267,7 +268,7 @@ abstract public class WriteFlusher protected void complete() { - if (_callback!=null) + if (_callback != null) _callback.succeeded(); } @@ -286,8 +287,8 @@ abstract public class WriteFlusher { State s = _state.get(); return (s instanceof PendingState) - ?((PendingState)s).getCallbackInvocationType() - :Invocable.InvocationType.BLOCKING; + ? ((PendingState)s).getCallbackInvocationType() + : Invocable.InvocationType.BLOCKING; } /** @@ -300,13 +301,13 @@ abstract public class WriteFlusher * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition * fails it'll fail the callback. * - * If not all buffers can be written in one go it creates a new PendingState object to preserve the state + * If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state * and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}. * * If all buffers have been written it calls callback.complete(). * * @param callback the callback to call on either failed or complete - * @param buffers the buffers to flush to the endpoint + * @param buffers the buffers to flush to the endpoint * @throws WritePendingException if unable to write due to prior pending write */ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException @@ -314,20 +315,20 @@ abstract public class WriteFlusher if (DEBUG) LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers)); - if (!updateState(__IDLE,__WRITING)) + if (!updateState(__IDLE, __WRITING)) throw new WritePendingException(); try { - buffers=flush(buffers); + buffers = flush(buffers); // if we are incomplete? - if (buffers!=null) + if (buffers != null) { if (DEBUG) LOG.debug("flushed incomplete"); - PendingState pending=new PendingState(buffers, callback); - if (updateState(__WRITING,pending)) + PendingState pending = new PendingState(buffers, callback); + if (updateState(__WRITING, pending)) onIncompleteFlush(); else fail(pending); @@ -335,18 +336,18 @@ abstract public class WriteFlusher } // If updateState didn't succeed, we don't care as our buffers have been written - if (!updateState(__WRITING,__IDLE)) + if (!updateState(__WRITING, __IDLE)) ignoreFail(); - if (callback!=null) + if (callback != null) callback.succeeded(); } catch (IOException e) { if (DEBUG) LOG.debug("write exception", e); - if (updateState(__WRITING,__IDLE)) + if (updateState(__WRITING, __IDLE)) { - if (callback!=null) + if (callback != null) callback.failed(e); } else @@ -354,7 +355,6 @@ abstract public class WriteFlusher } } - /** * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress. @@ -370,27 +370,27 @@ abstract public class WriteFlusher State previous = _state.get(); - if (previous.getType()!=StateType.PENDING) + if (previous.getType() != StateType.PENDING) return; // failure already handled. PendingState pending = (PendingState)previous; - if (!updateState(pending,__COMPLETING)) + if (!updateState(pending, __COMPLETING)) return; // failure already handled. try { ByteBuffer[] buffers = pending.getBuffers(); - buffers=flush(buffers); + buffers = flush(buffers); // if we are incomplete? - if (buffers!=null) + if (buffers != null) { if (DEBUG) - LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers)); - if (buffers!=pending.getBuffers()) - pending=new PendingState(buffers, pending._callback); - if (updateState(__COMPLETING,pending)) + LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers)); + if (buffers != pending.getBuffers()) + pending = new PendingState(buffers, pending._callback); + if (updateState(__COMPLETING, pending)) onIncompleteFlush(); else fail(pending); @@ -398,7 +398,7 @@ abstract public class WriteFlusher } // If updateState didn't succeed, we don't care as our buffers have been written - if (!updateState(__COMPLETING,__IDLE)) + if (!updateState(__COMPLETING, __IDLE)) ignoreFail(); pending.complete(); } @@ -406,7 +406,7 @@ abstract public class WriteFlusher { if (DEBUG) LOG.debug("completeWrite exception", e); - if(updateState(__COMPLETING,__IDLE)) + if (updateState(__COMPLETING, __IDLE)) pending.fail(e); else fail(pending); @@ -422,59 +422,84 @@ abstract public class WriteFlusher */ protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException { - boolean progress=true; - while(progress && buffers!=null) + boolean progress = true; + while (progress && buffers != null) { - int before=buffers.length==0?0:buffers[0].remaining(); - boolean flushed=_endPoint.flush(buffers); - int r=buffers.length==0?0:buffers[0].remaining(); + long before = remaining(buffers); + boolean flushed = _endPoint.flush(buffers); + long after = remaining(buffers); + long written = before - after; if (LOG.isDebugEnabled()) - LOG.debug("Flushed={} {}/{}+{} {}",flushed,before-r,before,buffers.length-1,this); + LOG.debug("Flushed={} written={} remaining={} {}", flushed, written, after, this); + + if (written > 0) + { + Connection connection = _endPoint.getConnection(); + if (connection instanceof Listener) + ((Listener)connection).onFlushed(written); + } if (flushed) return null; - progress=before!=r; + progress = written > 0; - int not_empty=0; - while(r==0) + int index = 0; + while (true) { - if (++not_empty==buffers.length) + if (index == buffers.length) { - buffers=null; - not_empty=0; + // All buffers consumed. + buffers = null; + index = 0; break; } - progress=true; - r=buffers[not_empty].remaining(); + else + { + int remaining = buffers[index].remaining(); + if (remaining > 0) + break; + ++index; + progress = true; + } } - - if (not_empty>0) - buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length); + if (index > 0) + buffers = Arrays.copyOfRange(buffers, index, buffers.length); } if (LOG.isDebugEnabled()) - LOG.debug("!fully flushed {}",this); + LOG.debug("!fully flushed {}", this); // If buffers is null, then flush has returned false but has consumed all the data! // This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS // and that will keep this WriteFlusher pending. - return buffers==null?EMPTY_BUFFERS:buffers; + return buffers == null ? EMPTY_BUFFERS : buffers; } - /* ------------------------------------------------------------ */ - /** Notify the flusher of a failure + private long remaining(ByteBuffer[] buffers) + { + if (buffers == null) + return 0; + long result = 0; + for (ByteBuffer buffer : buffers) + result += buffer.remaining(); + return result; + } + + /** + * Notify the flusher of a failure + * * @param cause The cause of the failure * @return true if the flusher passed the failure to a {@link Callback} instance */ public boolean onFail(Throwable cause) { // Keep trying to handle the failure until we get to IDLE or FAILED state - while(true) + while (true) { - State current=_state.get(); - switch(current.getType()) + State current = _state.get(); + switch (current.getType()) { case IDLE: case FAILED: @@ -487,7 +512,7 @@ abstract public class WriteFlusher LOG.debug("failed: " + this, cause); PendingState pending = (PendingState)current; - if (updateState(pending,__IDLE)) + if (updateState(pending, __IDLE)) return pending.fail(cause); break; @@ -495,7 +520,7 @@ abstract public class WriteFlusher if (DEBUG) LOG.debug("failed: " + this, cause); - if (updateState(current,new FailedState(cause))) + if (updateState(current, new FailedState(cause))) return false; break; } @@ -512,29 +537,9 @@ abstract public class WriteFlusher return _state.get().getType() == StateType.IDLE; } - public boolean isInProgress() - { - switch(_state.get().getType()) - { - case WRITING: - case PENDING: - case COMPLETING: - return true; - default: - return false; - } - } - - @Override - public String toString() - { - State s = _state.get(); - return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s,s instanceof PendingState?((PendingState)s).getCallback():null); - } - public String toStateString() { - switch(_state.get().getType()) + switch (_state.get().getType()) { case WRITING: return "W"; @@ -550,4 +555,33 @@ abstract public class WriteFlusher return "?"; } } + + @Override + public String toString() + { + State s = _state.get(); + return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s).getCallback() : null); + } + + /** + *

A listener of {@link WriteFlusher} events.

+ */ + public interface Listener + { + /** + *

Invoked when a {@link WriteFlusher} flushed bytes in a non-blocking way, + * as part of a - possibly larger - write.

+ *

This method may be invoked multiple times, for example when writing a large + * buffer: a first flush of bytes, then the connection became TCP congested, and + * a subsequent flush of bytes when the connection became writable again.

+ *

This method is never invoked concurrently, but may be invoked by different + * threads, so implementations may not rely on thread-local variables.

+ *

Implementations may throw an {@link IOException} to signal that the write + * should fail, for example if the implementation enforces a minimum data rate.

+ * + * @param bytes the number of bytes flushed + * @throws IOException if the write should fail + */ + void onFlushed(long bytes) throws IOException; + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java index b953a1b584d..49783b706f7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java @@ -67,6 +67,7 @@ public class HttpConfiguration private int _maxErrorDispatches = 10; private boolean _useDirectByteBuffers = false; private long _minRequestDataRate; + private long _minResponseDataRate; private CookieCompliance _cookieCompliance = CookieCompliance.RFC6265; private boolean _notifyRemoteAsyncErrors = true; @@ -129,6 +130,7 @@ public class HttpConfiguration _maxErrorDispatches=config._maxErrorDispatches; _useDirectByteBuffers=config._useDirectByteBuffers; _minRequestDataRate=config._minRequestDataRate; + _minResponseDataRate=config._minResponseDataRate; _cookieCompliance=config._cookieCompliance; _notifyRemoteAsyncErrors=config._notifyRemoteAsyncErrors; } @@ -512,7 +514,29 @@ public class HttpConfiguration { _minRequestDataRate=bytesPerSecond; } - + + /** + * @return The minimum response data rate in bytes per second; or <=0 for no limit + */ + @ManagedAttribute("The minimum response content data rate in bytes per second") + public long getMinResponseDataRate() + { + return _minResponseDataRate; + } + + /** + *

Sets an minimum response content data rate.

+ *

The value is enforced only approximately - not precisely - due to the fact that + * for efficiency reasons buffer writes may be comprised of both response headers and + * response content.

+ * + * @param bytesPerSecond The minimum response data rate in bytes per second; or <=0 for no limit + */ + public void setMinResponseDataRate(long bytesPerSecond) + { + _minResponseDataRate = bytesPerSecond; + } + public CookieCompliance getCookieCompliance() { return _cookieCompliance; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 4da4ddd87d1..37c4e9e9c20 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -40,6 +40,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; @@ -49,7 +50,7 @@ import org.eclipse.jetty.util.log.Logger; /** *

A {@link Connection} that handles the HTTP protocol.

*/ -public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom +public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom, WriteFlusher.Listener { private static final Logger LOG = Log.getLogger(HttpConnection.class); public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString()); @@ -192,6 +193,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http return null; } + @Override + public void onFlushed(long bytes) throws IOException + { + // Unfortunately cannot distinguish between header and content + // bytes, and for content bytes whether they are chunked or not. + _channel.getResponse().getHttpOutput().onFlushed(bytes); + } + void releaseRequestBuffer() { if (_requestBuffer != null && !_requestBuffer.hasRemaining()) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 47f9655430d..504ab78d062 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -282,7 +282,7 @@ public class HttpInput extends ServletInputStream implements Runnable { long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); if (_contentArrived < minimum_data) - throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate)); + throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request content data rate < %d B/s",minRequestDataRate)); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 7c6691a7c01..ba07598ad7b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritePendingException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; @@ -122,12 +123,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable private final HttpChannel _channel; private final SharedBlockingCallback _writeBlocker; private Interceptor _interceptor; - - /** - * Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. - */ private long _written; - + private long _flushed; + private long _firstByteTimeStamp = -1; private ByteBuffer _aggregate; private int _bufferSize; private int _commitSize; @@ -231,6 +229,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable protected void write(ByteBuffer content, boolean complete, Callback callback) { + if (_firstByteTimeStamp == -1) + { + long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate(); + if (minDataRate > 0) + _firstByteTimeStamp = System.nanoTime(); + else + _firstByteTimeStamp = Long.MAX_VALUE; + } _interceptor.write(content, complete, callback); } @@ -908,6 +914,30 @@ public class HttpOutput extends ServletOutputStream implements Runnable _commitSize = size; } + /** + *

Invoked when bytes have been flushed to the network.

+ *

The number of flushed bytes may be different from the bytes written + * by the application if an {@link Interceptor} changed them, for example + * by compressing them.

+ * + * @param bytes the number of bytes flushed + * @throws IOException if the minimum data rate, when set, is not respected + * @see org.eclipse.jetty.io.WriteFlusher.Listener + */ + public void onFlushed(long bytes) throws IOException + { + if (_firstByteTimeStamp == -1 || _firstByteTimeStamp == Long.MAX_VALUE) + return; + long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate(); + _flushed += bytes; + long elapsed = System.nanoTime() - _firstByteTimeStamp; + long minFlushed = minDataRate * TimeUnit.NANOSECONDS.toMillis(elapsed) / TimeUnit.SECONDS.toMillis(1); + if (LOG.isDebugEnabled()) + LOG.debug("Flushed bytes min/actual {}/{}", minFlushed, _flushed); + if (_flushed < minFlushed) + throw new IOException(String.format("Response content data rate < %d B/s", minDataRate)); + } + public void recycle() { _interceptor = _channel; @@ -920,6 +950,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable _written = 0; _writeListener = null; _onError = null; + _firstByteTimeStamp = -1; + _flushed = 0; reopen(); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index db26786c0e4..bb81fc6b5a7 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -19,6 +19,8 @@ package org.eclipse.jetty.util.thread.strategy; import java.io.Closeable; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.LongAdder; @@ -32,8 +34,6 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; -import org.eclipse.jetty.util.thread.Locker; -import org.eclipse.jetty.util.thread.Locker.Lock; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; /** @@ -66,9 +66,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); - private enum State { IDLE, PRODUCING, REPRODUCING } - - private final Locker _locker = new Locker(); + private enum State { IDLE, PENDING, PRODUCING, REPRODUCING } + private final LongAdder _nonBlocking = new LongAdder(); private final LongAdder _blocking = new LongAdder(); private final LongAdder _executed = new LongAdder(); @@ -94,19 +93,20 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat _producers = producers; addBean(_producer); if (LOG.isDebugEnabled()) - LOG.debug("{} created", this); + LOG.debug("{} created", this); } @Override public void dispatch() { boolean execute = false; - try (Lock locked = _locker.lock()) + synchronized(this) { switch(_state) { case IDLE: execute = true; + _state = State.PENDING; break; case PRODUCING: @@ -136,19 +136,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { if (LOG.isDebugEnabled()) LOG.debug("{} produce", this); - boolean reproduce = true; - while(isRunning() && tryProduce(reproduce) && doProduce()) - reproduce = false; + if (tryProduce()) + doProduce(); } - public boolean tryProduce(boolean reproduce) + private boolean tryProduce() { boolean producing = false; - try (Lock locked = _locker.lock()) + synchronized(this) { switch (_state) { case IDLE: + case PENDING: // Enter PRODUCING _state = State.PRODUCING; producing = true; @@ -156,8 +156,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat case PRODUCING: // Keep other Thread producing - if (reproduce) - _state = State.REPRODUCING; + _state = State.REPRODUCING; break; default: @@ -167,7 +166,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat return producing; } - public boolean doProduce() + private void doProduce() { boolean producing = true; while (isRunning() && producing) @@ -178,30 +177,28 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { task = _producer.produce(); } - catch(Throwable e) + catch (Throwable e) { LOG.warn(e); } - - if (LOG.isDebugEnabled()) - LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task)); - + if (task==null) { - try (Lock locked = _locker.lock()) + synchronized(this) { - // Could another one just have been queued with a produce call? - if (_state==State.REPRODUCING) + // Could another task just have been queued with a produce call? + switch (_state) { - _state = State.PRODUCING; - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("{} IDLE",toStringLocked()); - _state = State.IDLE; - producing = false; - } + case PRODUCING: + _state = State.IDLE; + producing = false; + break; + case REPRODUCING: + _state = State.PRODUCING; + break; + default: + throw new IllegalStateException(toStringLocked()); + } } } else @@ -209,21 +206,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat boolean consume; if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING) { - // PRODUCE CONSUME (EWYK!) - if (LOG.isDebugEnabled()) - LOG.debug("{} PC t={}", this, task); + // PRODUCE CONSUME consume = true; - _nonBlocking.increment(); + _nonBlocking.increment(); } else { - try (Lock locked = _locker.lock()) + synchronized(this) { if (_producers.tryExecute(this)) { // EXECUTE PRODUCE CONSUME! // We have executed a new Producer, so we can EWYK consume - _state = State.IDLE; + _state = State.PENDING; producing = false; consume = true; _blocking.increment(); @@ -233,13 +228,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat // PRODUCE EXECUTE CONSUME! consume = false; _executed.increment(); - } + } } - - if (LOG.isDebugEnabled()) - LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task); } + if (LOG.isDebugEnabled()) + LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task)); + // Consume or execute task try { @@ -250,7 +245,10 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat } catch (RejectedExecutionException e) { - LOG.warn(e); + if (isRunning()) + LOG.warn(e); + else + LOG.ignore(e); if (task instanceof Closeable) { try @@ -269,8 +267,6 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat } } } - - return producing; } @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true) @@ -294,7 +290,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) public boolean isIdle() { - try (Lock locked = _locker.lock()) + synchronized(this) { return _state==State.IDLE; } @@ -310,7 +306,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat public String toString() { - try (Lock locked = _locker.lock()) + synchronized(this) { return toStringLocked(); } @@ -339,5 +335,14 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat builder.append(_state); builder.append('/'); builder.append(_producers); + builder.append("[nb="); + builder.append(getNonBlockingTasksConsumed()); + builder.append(",c="); + builder.append(getBlockingTasksConsumed()); + builder.append(",e="); + builder.append(getBlockingTasksExecuted()); + builder.append("]"); + builder.append("@"); + builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java index eab1eedf66a..87f1315671c 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java @@ -27,11 +27,16 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; -public class EmptyServerHandler extends AbstractHandler +public class EmptyServerHandler extends AbstractHandler.ErrorDispatchHandler { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + jettyRequest.setHandled(true); + service(target, jettyRequest, request, response); + } + + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - baseRequest.setHandled(true); } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java index b7a020d59e2..409254e6711 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java @@ -37,7 +37,6 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.BytesContentProvider; @@ -497,9 +496,8 @@ public class HttpClientTest extends AbstractTest start(new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - super.handle(target, baseRequest, request, response); response.getWriter().write("Jetty"); } }); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java index f7c93c4837a..7acd68c5a95 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java @@ -43,6 +43,8 @@ import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Request; @@ -50,7 +52,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.StacklessLogging; +import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; public class ServerTimeoutsTest extends AbstractTest @@ -736,6 +740,76 @@ public class ServerTimeoutsTest extends AbstractTest Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testBlockingWriteWithMinimumDataRateBelowLimit() throws Exception + { + // This test needs a large write to stall the server, and a slow reading client. + // In HTTP/1.1, when using the loopback interface, the buffers are so large that + // it would require a very large write (32 MiB) and a lot of time for this test + // to pass. On the first writes, the server fills in the large buffers with a lot + // of bytes (about 4 MiB), and so it would take a lot of time for the client to + // read those bytes and eventually produce a write rate that will make the server + // fail; and the write should be large enough to _not_ complete before the rate + // is below the minimum. + // In HTTP/2, we force the flow control window to be small, so that the server + // stalls almost immediately without having written many bytes, so that the test + // completes quickly. + Assume.assumeThat(transport, Matchers.isOneOf(Transport.H2, Transport.H2C)); + + int bytesPerSecond = 16 * 1024; + httpConfig.setMinResponseDataRate(bytesPerSecond); + CountDownLatch serverLatch = new CountDownLatch(1); + start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + ServletOutputStream output = response.getOutputStream(); + output.write(new byte[8 * 1024 * 1024]); + } + catch (IOException x) + { + serverLatch.countDown(); + } + } + }); + ((HttpClientTransportOverHTTP2)client.getTransport()).getHTTP2Client().setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + + // Setup the client to read slower than the min data rate. + BlockingQueue objects = new LinkedBlockingQueue<>(); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .onResponseContentAsync((response, content, callback) -> + { + objects.offer(content.remaining()); + objects.offer(callback); + }) + .send(result -> + { + objects.offer(-1); + objects.offer(Callback.NOOP); + if (result.isFailed()) + clientLatch.countDown(); + }); + + long readRate = bytesPerSecond / 2; + while (true) + { + int bytes = (Integer)objects.poll(5, TimeUnit.SECONDS); + if (bytes < 0) + break; + long ms = bytes * 1000L / readRate; + Thread.sleep(ms); + Callback callback = (Callback)objects.poll(); + callback.succeeded(); + } + + Assert.assertTrue(serverLatch.await(15, TimeUnit.SECONDS)); + Assert.assertTrue(clientLatch.await(15, TimeUnit.SECONDS)); + } + private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler { private final CountDownLatch handlerLatch; diff --git a/tests/test-sessions/test-hazelcast-sessions/pom.xml b/tests/test-sessions/test-hazelcast-sessions/pom.xml index 0633d26dd6e..aac4c7c3c8d 100644 --- a/tests/test-sessions/test-hazelcast-sessions/pom.xml +++ b/tests/test-sessions/test-hazelcast-sessions/pom.xml @@ -49,6 +49,15 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + ${project.build.testOutputDirectory}/logging.properties + + + @@ -88,7 +97,6 @@ jetty-test-helper test - diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java index aabffb8dfda..204897ee78d 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java @@ -20,19 +20,29 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredLastAccessTimeTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; public class ClusteredLastAccessTimeTest extends AbstractClusteredLastAccessTimeTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } + } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java index e5a0be0a09a..466f430b647 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredOrphanedSessionTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ClusteredOrphanedSessionTest @@ -29,6 +30,7 @@ public class ClusteredOrphanedSessionTest extends AbstractClusteredOrphanedSessionTest { + HazelcastSessionDataStoreFactory factory; /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() @@ -36,9 +38,14 @@ public class ClusteredOrphanedSessionTest @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } - + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java index b6a310f22ff..361aac257bc 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredSessionMigrationTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ClusteredSessionMigrationTest @@ -28,6 +29,7 @@ import org.eclipse.jetty.server.session.SessionDataStoreFactory; public class ClusteredSessionMigrationTest extends AbstractClusteredSessionMigrationTest { + HazelcastSessionDataStoreFactory factory; /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() @@ -35,7 +37,14 @@ public class ClusteredSessionMigrationTest @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java index 0d626c33fa7..e4bd2f8868f 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredSessionScavengingTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ClusteredSessionScavengingTest @@ -29,13 +30,22 @@ public class ClusteredSessionScavengingTest extends AbstractClusteredSessionScavengingTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java index 5570d0946ba..68d9a51d468 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ModifyMaxInactiveIntervalTest @@ -29,14 +30,23 @@ public class ModifyMaxInactiveIntervalTest extends AbstractModifyMaxInactiveIntervalTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } + } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java index 8c024268b0f..e364bb085ad 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; import static org.junit.Assert.*; @@ -31,6 +32,8 @@ public class NonClusteredSessionScavengingTest extends AbstractNonClusteredSessionScavengingTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest#assertSession(java.lang.String, boolean) */ @@ -63,7 +66,14 @@ public class NonClusteredSessionScavengingTest @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java index ae911a0614b..2d51f25109d 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java @@ -21,20 +21,30 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractSessionExpiryTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; import org.junit.Test; public class SessionExpiryTest extends AbstractSessionExpiryTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } + } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java index c958fb6e6c5..434b11595cb 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractSessionInvalidateCreateScavengeTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * SessionInvalidateCreateScavengeTest @@ -29,13 +30,22 @@ public class SessionInvalidateCreateScavengeTest extends AbstractSessionInvalidateCreateScavengeTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java index 2078882ee21..f2549d7b964 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java @@ -32,7 +32,7 @@ public class ClientLastAccessTimeTest extends AbstractClusteredLastAccessTimeTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java index f74d05746c9..effcafdecca 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java @@ -19,9 +19,15 @@ package org.eclipse.jetty.hazelcast.session.client; +import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; import org.eclipse.jetty.hazelcast.session.HazelcastSessionDataStoreFactory; import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; +import org.junit.Before; /** * ModifyMaxInactiveIntervalTest @@ -30,6 +36,27 @@ public class ClientModifyMaxInactiveIntervalTest extends AbstractModifyMaxInactiveIntervalTest { + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); + + private HazelcastInstance hazelcastInstance; + + @Before + public void startHazelcast() + throws Exception + { + Config config = new Config().addMapConfig( new MapConfig().setName( MAP_NAME ) ) // + .setInstanceName( "beer" ); + // start Hazelcast instance + hazelcastInstance = Hazelcast.getOrCreateHazelcastInstance( config ); + } + + @After + public void stopHazelcast() + throws Exception + { + hazelcastInstance.shutdown(); + } + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @@ -37,6 +64,8 @@ public class ClientModifyMaxInactiveIntervalTest public SessionDataStoreFactory createSessionDataStoreFactory() { HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory.setOnlyClient( true ); + factory.setMapName( MAP_NAME ); return factory; } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java index e9b00f20433..4abc6b1cedb 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java @@ -60,7 +60,7 @@ public class ClientNonClusteredSessionScavengingTest fail( e.getMessage() ); } } - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java index 00943a5f036..2a08debb557 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java @@ -33,7 +33,7 @@ public class ClientOrphanedSessionTest extends AbstractClusteredOrphanedSessionTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java index 4533b980d8a..a2fc8a24c37 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java @@ -33,7 +33,7 @@ public class ClientSessionExpiryTest extends AbstractSessionExpiryTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java index 1219907e689..cedd39abcaa 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java @@ -32,7 +32,7 @@ import org.junit.Before; public class ClientSessionInvalidateCreateScavengeTest extends AbstractSessionInvalidateCreateScavengeTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java index e5dc2cc1a9a..ba786895c02 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java @@ -35,7 +35,7 @@ import org.junit.Before; public class ClientSessionMigrationTest extends AbstractClusteredSessionMigrationTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java index 06a34569462..e7ca25a3877 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java @@ -33,7 +33,7 @@ public class ClientSessionScavengingTest extends AbstractClusteredSessionScavengingTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/resources/jetty-logging.properties b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/jetty-logging.properties new file mode 100644 index 00000000000..07faa86dbce --- /dev/null +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/jetty-logging.properties @@ -0,0 +1 @@ +org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog \ No newline at end of file diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/resources/logging.properties b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/logging.properties new file mode 100644 index 00000000000..256dbe49265 --- /dev/null +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/logging.properties @@ -0,0 +1,3 @@ +handlers=java.util.logging.ConsoleHandler +.level=INFO +com.hazelcast.level=SEVERE \ No newline at end of file