From 0c021f2599cd13c477e48d38b03b51880aa54e58 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 14 Dec 2017 23:08:08 +0100 Subject: [PATCH 1/5] EWYK cleanups from #1970 Signed-off-by: Greg Wilkins --- .../util/thread/strategy/EatWhatYouKill.java | 99 ++++++++++--------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index db26786c0e4..bb81fc6b5a7 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -19,6 +19,8 @@ package org.eclipse.jetty.util.thread.strategy; import java.io.Closeable; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.LongAdder; @@ -32,8 +34,6 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; -import org.eclipse.jetty.util.thread.Locker; -import org.eclipse.jetty.util.thread.Locker.Lock; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; /** @@ -66,9 +66,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); - private enum State { IDLE, PRODUCING, REPRODUCING } - - private final Locker _locker = new Locker(); + private enum State { IDLE, PENDING, PRODUCING, REPRODUCING } + private final LongAdder _nonBlocking = new LongAdder(); private final LongAdder _blocking = new LongAdder(); private final LongAdder _executed = new LongAdder(); @@ -94,19 +93,20 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat _producers = producers; addBean(_producer); if (LOG.isDebugEnabled()) - LOG.debug("{} created", this); + LOG.debug("{} created", this); } @Override public void dispatch() { boolean execute = false; - try (Lock locked = _locker.lock()) + synchronized(this) { switch(_state) { case IDLE: execute = true; + _state = State.PENDING; break; case PRODUCING: @@ -136,19 +136,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { if (LOG.isDebugEnabled()) LOG.debug("{} produce", this); - boolean reproduce = true; - while(isRunning() && tryProduce(reproduce) && doProduce()) - reproduce = false; + if (tryProduce()) + doProduce(); } - public boolean tryProduce(boolean reproduce) + private boolean tryProduce() { boolean producing = false; - try (Lock locked = _locker.lock()) + synchronized(this) { switch (_state) { case IDLE: + case PENDING: // Enter PRODUCING _state = State.PRODUCING; producing = true; @@ -156,8 +156,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat case PRODUCING: // Keep other Thread producing - if (reproduce) - _state = State.REPRODUCING; + _state = State.REPRODUCING; break; default: @@ -167,7 +166,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat return producing; } - public boolean doProduce() + private void doProduce() { boolean producing = true; while (isRunning() && producing) @@ -178,30 +177,28 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat { task = _producer.produce(); } - catch(Throwable e) + catch (Throwable e) { LOG.warn(e); } - - if (LOG.isDebugEnabled()) - LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task)); - + if (task==null) { - try (Lock locked = _locker.lock()) + synchronized(this) { - // Could another one just have been queued with a produce call? - if (_state==State.REPRODUCING) + // Could another task just have been queued with a produce call? + switch (_state) { - _state = State.PRODUCING; - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("{} IDLE",toStringLocked()); - _state = State.IDLE; - producing = false; - } + case PRODUCING: + _state = State.IDLE; + producing = false; + break; + case REPRODUCING: + _state = State.PRODUCING; + break; + default: + throw new IllegalStateException(toStringLocked()); + } } } else @@ -209,21 +206,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat boolean consume; if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING) { - // PRODUCE CONSUME (EWYK!) - if (LOG.isDebugEnabled()) - LOG.debug("{} PC t={}", this, task); + // PRODUCE CONSUME consume = true; - _nonBlocking.increment(); + _nonBlocking.increment(); } else { - try (Lock locked = _locker.lock()) + synchronized(this) { if (_producers.tryExecute(this)) { // EXECUTE PRODUCE CONSUME! // We have executed a new Producer, so we can EWYK consume - _state = State.IDLE; + _state = State.PENDING; producing = false; consume = true; _blocking.increment(); @@ -233,13 +228,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat // PRODUCE EXECUTE CONSUME! consume = false; _executed.increment(); - } + } } - - if (LOG.isDebugEnabled()) - LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task); } + if (LOG.isDebugEnabled()) + LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task)); + // Consume or execute task try { @@ -250,7 +245,10 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat } catch (RejectedExecutionException e) { - LOG.warn(e); + if (isRunning()) + LOG.warn(e); + else + LOG.ignore(e); if (task instanceof Closeable) { try @@ -269,8 +267,6 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat } } } - - return producing; } @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true) @@ -294,7 +290,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) public boolean isIdle() { - try (Lock locked = _locker.lock()) + synchronized(this) { return _state==State.IDLE; } @@ -310,7 +306,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat public String toString() { - try (Lock locked = _locker.lock()) + synchronized(this) { return toStringLocked(); } @@ -339,5 +335,14 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat builder.append(_state); builder.append('/'); builder.append(_producers); + builder.append("[nb="); + builder.append(getNonBlockingTasksConsumed()); + builder.append(",c="); + builder.append(getBlockingTasksConsumed()); + builder.append(",e="); + builder.append(getBlockingTasksExecuted()); + builder.append("]"); + builder.append("@"); + builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); } } From 8f6978dd36fa740d064605a7d64070161d1f320c Mon Sep 17 00:00:00 2001 From: H1Gdev Date: Tue, 19 Dec 2017 11:07:15 +0900 Subject: [PATCH 2/5] Fix typo in WebSocket doc. Signed-off-by: H1Gdev --- .../websockets/jetty/jetty-websocket-api-send-message.adoc | 2 +- .../websockets/jetty/jetty-websocket-api-session.adoc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc index d89a77bf391..49afbe47204 100644 --- a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc +++ b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-send-message.adoc @@ -17,7 +17,7 @@ [[jetty-websocket-api-send-message]] === Send Messages to Remote Endpoint -The most important feature of the Session is access to the link:{JDURL}/org/eclipse/jetty/websocket/api/RemoteEndpoint.html[`org.eclipse.jetty.websocket.api.RemoteEndpoint`]needed to send messages. +The most important feature of the Session is access to the link:{JDURL}/org/eclipse/jetty/websocket/api/RemoteEndpoint.html[`org.eclipse.jetty.websocket.api.RemoteEndpoint`] needed to send messages. With RemoteEndpoint you can choose to send TEXT or BINARY WebSocket messages, or the WebSocket PING and PONG control frames. diff --git a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc index abcee12d5f5..754e0044c0e 100644 --- a/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc +++ b/jetty-documentation/src/main/asciidoc/development/websockets/jetty/jetty-websocket-api-session.adoc @@ -44,7 +44,7 @@ What was in the Upgrade Request and Response. UpgradeRequest req = session.getUpgradeRequest(); String channelName = req.getParameterMap().get("channelName"); -UpgradeRespons resp = session.getUpgradeResponse(); +UpgradeResponse resp = session.getUpgradeResponse(); String subprotocol = resp.getAcceptedSubProtocol(); ---- From 005ae95beab5e11a72b876c8d8e41e205091947d Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 21 Dec 2017 02:34:39 -0800 Subject: [PATCH 3/5] Fixes #1949 - Client-side problems with digest authentication. (#1991) Introduced ContentProvider.isReproducible() to detect whether the request content can be provided more than once, and modified ContentProvider implementation accordingly. Modified AuthenticationProtocolHandler to not send an authenticated request if the content is not reproducible. Modified AuthenticationProtocolHandler to tolerate request failures. Signed-off-by: Simone Bordet --- .../client/AuthenticationProtocolHandler.java | 18 +- .../jetty/client/api/ContentProvider.java | 17 ++ .../util/ByteBufferContentProvider.java | 12 +- .../client/util/BytesContentProvider.java | 12 +- .../client/util/PathContentProvider.java | 6 + .../client/AbstractHttpClientServerTest.java | 2 + .../client/HttpClientAuthenticationTest.java | 166 +++++++++++++++--- 7 files changed, 193 insertions(+), 40 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 3fe68cc340d..5531f289740 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -27,6 +27,7 @@ import java.util.regex.Pattern; import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -86,7 +87,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler { HttpRequest request = (HttpRequest)result.getRequest(); ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding()); - if (result.isFailed()) + if (result.getResponseFailure() != null) { if (LOG.isDebugEnabled()) LOG.debug("Authentication challenge failed {}", result.getFailure()); @@ -98,7 +99,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler HttpConversation conversation = request.getConversation(); if (conversation.getAttribute(authenticationAttribute) != null) { - // We have already tried to authenticate, but we failed again + // We have already tried to authenticate, but we failed again. if (LOG.isDebugEnabled()) LOG.debug("Bad credentials for {}", request); forwardSuccessComplete(request, response); @@ -111,7 +112,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler { if (LOG.isDebugEnabled()) LOG.debug("Authentication challenge without {} header", header); - forwardFailureComplete(request, null, response, new HttpResponseException("HTTP protocol violation: Authentication challenge without " + header + " header", response)); + forwardFailureComplete(request, result.getRequestFailure(), response, new HttpResponseException("HTTP protocol violation: Authentication challenge without " + header + " header", response)); return; } @@ -138,9 +139,18 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler return; } + ContentProvider requestContent = request.getContent(); + if (requestContent != null && !requestContent.isReproducible()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Request content not reproducible for {}", request); + forwardSuccessComplete(request, response); + return; + } + try { - final Authentication.Result authnResult = authentication.authenticate(request, response, headerInfo, conversation); + Authentication.Result authnResult = authentication.authenticate(request, response, headerInfo, conversation); if (LOG.isDebugEnabled()) LOG.debug("Authentication result {}", authnResult); if (authnResult == null) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java index 03754f72c9b..6419b11e8f4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/ContentProvider.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.nio.ByteBuffer; import java.util.Iterator; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.util.ByteBufferContentProvider; import org.eclipse.jetty.client.util.PathContentProvider; @@ -48,6 +49,22 @@ public interface ContentProvider extends Iterable */ long getLength(); + /** + *

Whether this ContentProvider can produce exactly the same content more + * than once.

+ *

Implementations should return {@code true} only if the content can be + * produced more than once, which means that invocations to {@link #iterator()} + * must return a new, independent, iterator instance over the content.

+ *

The {@link HttpClient} implementation may use this method in particular + * cases where it detects that it is safe to retry a request that failed.

+ * + * @return whether the content can be produced more than once + */ + default boolean isReproducible() + { + return false; + } + /** * An extension of {@link ContentProvider} that provides a content type string * to be used as a {@code Content-Type} HTTP header in requests. diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java index 68386758da9..2bc5233f8be 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/ByteBufferContentProvider.java @@ -57,6 +57,12 @@ public class ByteBufferContentProvider extends AbstractTypedContentProvider return length; } + @Override + public boolean isReproducible() + { + return true; + } + @Override public Iterator iterator() { @@ -85,12 +91,6 @@ public class ByteBufferContentProvider extends AbstractTypedContentProvider throw new NoSuchElementException(); } } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } }; } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java index d2c080aa700..39f5fd5ad51 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BytesContentProvider.java @@ -53,6 +53,12 @@ public class BytesContentProvider extends AbstractTypedContentProvider return length; } + @Override + public boolean isReproducible() + { + return true; + } + @Override public Iterator iterator() { @@ -78,12 +84,6 @@ public class BytesContentProvider extends AbstractTypedContentProvider throw new NoSuchElementException(); } } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } }; } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java index d1051d94136..00147a556df 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/PathContentProvider.java @@ -85,6 +85,12 @@ public class PathContentProvider extends AbstractTypedContentProvider return fileSize; } + @Override + public boolean isReproducible() + { + return true; + } + public ByteBufferPool getByteBufferPool() { return bufferPool; diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java index b591cd7481f..4eea728f923 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.After; @@ -96,6 +97,7 @@ public abstract class AbstractHttpClientServerTest clientThreads.setName("client"); client = new HttpClient(transport, sslContextFactory); client.setExecutor(clientThreads); + client.setSocketAddressResolver(new SocketAddressResolver.Sync()); client.start(); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java index 6e1a5ca074d..14d23267228 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java @@ -22,11 +22,14 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntFunction; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -34,6 +37,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.AuthenticationStore; +import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -41,6 +45,7 @@ import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BasicAuthentication; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.DigestAuthentication; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.security.Authenticator; import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintSecurityHandler; @@ -220,7 +225,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest { baseRequest.setHandled(true); if (requests.incrementAndGet() == 1) - response.sendRedirect(URIUtil.newURI(scheme,request.getServerName(),request.getServerPort(),request.getRequestURI(),null)); + response.sendRedirect(URIUtil.newURI(scheme, request.getServerName(), request.getServerPort(), request.getRequestURI(), null)); } }); @@ -259,7 +264,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest { baseRequest.setHandled(true); if (request.getRequestURI().endsWith("/redirect")) - response.sendRedirect(URIUtil.newURI(scheme,request.getServerName(),request.getServerPort(),"/secure",null)); + response.sendRedirect(URIUtil.newURI(scheme, request.getServerName(), request.getServerPort(), "/secure", null)); } }); @@ -424,6 +429,40 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest Assert.assertEquals(1, requests.get()); } + @Test + public void test_NonReproducibleContent() throws Exception + { + startBasic(new EmptyServerHandler()); + + AuthenticationStore authenticationStore = client.getAuthenticationStore(); + URI uri = URI.create(scheme + "://localhost:" + connector.getLocalPort()); + BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic"); + authenticationStore.addAuthentication(authentication); + + CountDownLatch resultLatch = new CountDownLatch(1); + byte[] data = new byte[]{'h', 'e', 'l', 'l', 'o'}; + DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(data)) + { + @Override + public boolean isReproducible() + { + return false; + } + }; + Request request = client.newRequest(uri) + .path("/secure") + .content(content); + request.send(result -> + { + if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.UNAUTHORIZED_401) + resultLatch.countDown(); + }); + + content.close(); + + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + @Test public void test_RequestFailsAfterResponse() throws Exception { @@ -434,32 +473,111 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic"); authenticationStore.addAuthentication(authentication); - CountDownLatch successLatch = new CountDownLatch(1); + AtomicBoolean fail = new AtomicBoolean(true); + GeneratingContentProvider content = new GeneratingContentProvider(index -> + { + switch (index) + { + case 0: + return ByteBuffer.wrap(new byte[]{'h', 'e', 'l', 'l', 'o'}); + case 1: + return ByteBuffer.wrap(new byte[]{'w', 'o', 'r', 'l', 'd'}); + case 2: + if (fail.compareAndSet(true, false)) + { + // Wait for the 401 response to arrive + // to the authentication protocol handler. + sleep(1000); + // Trigger request failure. + throw new RuntimeException(); + } + else + { + return null; + } + default: + throw new IllegalStateException(); + } + }); CountDownLatch resultLatch = new CountDownLatch(1); - DeferredContentProvider content = new DeferredContentProvider(); - Request request = client.newRequest("localhost", connector.getLocalPort()) + client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .path("/secure") .content(content) - .onResponseSuccess(response -> successLatch.countDown()); - request.send(result -> - { - if (result.isFailed() && result.getResponseFailure() == null) - resultLatch.countDown(); - }); + .send(result -> + { + if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200) + resultLatch.countDown(); + }); - // Send some content to make sure the request is dispatched on the server. - content.offer(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))); - - // Wait for the response to arrive to - // the authentication protocol handler. - Thread.sleep(1000); - - // Trigger request failure. - request.abort(new Exception()); - - // Verify that the response was successful, it's the request that failed. - Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); } + + private void sleep(long time) + { + try + { + Thread.sleep(time); + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + + private static class GeneratingContentProvider implements ContentProvider + { + private static final ByteBuffer DONE = ByteBuffer.allocate(0); + + private final IntFunction generator; + + private GeneratingContentProvider(IntFunction generator) + { + this.generator = generator; + } + + @Override + public long getLength() + { + return -1; + } + + @Override + public boolean isReproducible() + { + return true; + } + + @Override + public Iterator iterator() + { + return new Iterator() + { + private int index; + public ByteBuffer current; + + @Override + public boolean hasNext() + { + if (current == null) + { + current = generator.apply(index++); + if (current == null) + current = DONE; + } + return current != DONE; + } + + @Override + public ByteBuffer next() + { + ByteBuffer result = current; + current = null; + if (result == null) + throw new NoSuchElementException(); + return result; + } + }; + } + } } From 696b60a541bf3b72c8f7b17eee4830ad6ff7485e Mon Sep 17 00:00:00 2001 From: olivier lamy Date: Wed, 13 Dec 2017 22:09:54 +1100 Subject: [PATCH 4/5] ensure we really use only client test Signed-off-by: olivier lamy use a randomized map name to store sessions in order to avoid conflicts Signed-off-by: olivier lamy reduce hazelcast logging noise Signed-off-by: olivier lamy clarify onlyClient mode Signed-off-by: olivier lamy --- .../HazelcastSessionDataStoreFactory.java | 5 ++++ .../test-hazelcast-sessions/pom.xml | 10 ++++++- .../session/ClusteredLastAccessTimeTest.java | 12 +++++++- .../session/ClusteredOrphanedSessionTest.java | 11 +++++-- .../ClusteredSessionMigrationTest.java | 11 ++++++- .../ClusteredSessionScavengingTest.java | 12 +++++++- .../ModifyMaxInactiveIntervalTest.java | 12 +++++++- .../NonClusteredSessionScavengingTest.java | 12 +++++++- .../hazelcast/session/SessionExpiryTest.java | 12 +++++++- .../SessionInvalidateCreateScavengeTest.java | 12 +++++++- .../client/ClientLastAccessTimeTest.java | 2 +- .../ClientModifyMaxInactiveIntervalTest.java | 29 +++++++++++++++++++ ...ientNonClusteredSessionScavengingTest.java | 2 +- .../client/ClientOrphanedSessionTest.java | 2 +- .../client/ClientSessionExpiryTest.java | 2 +- ...ntSessionInvalidateCreateScavengeTest.java | 2 +- .../client/ClientSessionMigrationTest.java | 2 +- .../client/ClientSessionScavengingTest.java | 2 +- .../test/resources/jetty-logging.properties | 1 + .../src/test/resources/logging.properties | 3 ++ 20 files changed, 139 insertions(+), 17 deletions(-) create mode 100644 tests/test-sessions/test-hazelcast-sessions/src/test/resources/jetty-logging.properties create mode 100644 tests/test-sessions/test-hazelcast-sessions/src/test/resources/logging.properties diff --git a/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java b/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java index 7959f8a5072..766920ed9c3 100644 --- a/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java +++ b/jetty-hazelcast/src/main/java/org/eclipse/jetty/hazelcast/session/HazelcastSessionDataStoreFactory.java @@ -121,6 +121,11 @@ public class HazelcastSessionDataStoreFactory return onlyClient; } + /** + * + * @param onlyClient if true the session manager will only connect to an external Hazelcast instance + * and not use this JVM to start an Hazelcast instance + */ public void setOnlyClient( boolean onlyClient ) { this.onlyClient = onlyClient; diff --git a/tests/test-sessions/test-hazelcast-sessions/pom.xml b/tests/test-sessions/test-hazelcast-sessions/pom.xml index 2bd1287e31e..9ad03019564 100644 --- a/tests/test-sessions/test-hazelcast-sessions/pom.xml +++ b/tests/test-sessions/test-hazelcast-sessions/pom.xml @@ -49,6 +49,15 @@ + + org.apache.maven.plugins + maven-surefire-plugin + + + ${project.build.testOutputDirectory}/logging.properties + + + @@ -88,7 +97,6 @@ jetty-test-helper test - diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java index aabffb8dfda..204897ee78d 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredLastAccessTimeTest.java @@ -20,19 +20,29 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredLastAccessTimeTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; public class ClusteredLastAccessTimeTest extends AbstractClusteredLastAccessTimeTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } + } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java index e5a0be0a09a..466f430b647 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredOrphanedSessionTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredOrphanedSessionTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ClusteredOrphanedSessionTest @@ -29,6 +30,7 @@ public class ClusteredOrphanedSessionTest extends AbstractClusteredOrphanedSessionTest { + HazelcastSessionDataStoreFactory factory; /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() @@ -36,9 +38,14 @@ public class ClusteredOrphanedSessionTest @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } - + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java index b6a310f22ff..361aac257bc 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionMigrationTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredSessionMigrationTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ClusteredSessionMigrationTest @@ -28,6 +29,7 @@ import org.eclipse.jetty.server.session.SessionDataStoreFactory; public class ClusteredSessionMigrationTest extends AbstractClusteredSessionMigrationTest { + HazelcastSessionDataStoreFactory factory; /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() @@ -35,7 +37,14 @@ public class ClusteredSessionMigrationTest @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java index 0d626c33fa7..e4bd2f8868f 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ClusteredSessionScavengingTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractClusteredSessionScavengingTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ClusteredSessionScavengingTest @@ -29,13 +30,22 @@ public class ClusteredSessionScavengingTest extends AbstractClusteredSessionScavengingTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java index 5570d0946ba..68d9a51d468 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/ModifyMaxInactiveIntervalTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * ModifyMaxInactiveIntervalTest @@ -29,14 +30,23 @@ public class ModifyMaxInactiveIntervalTest extends AbstractModifyMaxInactiveIntervalTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } + } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java index 8c024268b0f..e364bb085ad 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/NonClusteredSessionScavengingTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; import static org.junit.Assert.*; @@ -31,6 +32,8 @@ public class NonClusteredSessionScavengingTest extends AbstractNonClusteredSessionScavengingTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest#assertSession(java.lang.String, boolean) */ @@ -63,7 +66,14 @@ public class NonClusteredSessionScavengingTest @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java index ae911a0614b..2d51f25109d 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionExpiryTest.java @@ -21,20 +21,30 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractSessionExpiryTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; import org.junit.Test; public class SessionExpiryTest extends AbstractSessionExpiryTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } + } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java index c958fb6e6c5..434b11595cb 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/SessionInvalidateCreateScavengeTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session; import org.eclipse.jetty.server.session.AbstractSessionInvalidateCreateScavengeTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; /** * SessionInvalidateCreateScavengeTest @@ -29,13 +30,22 @@ public class SessionInvalidateCreateScavengeTest extends AbstractSessionInvalidateCreateScavengeTest { + HazelcastSessionDataStoreFactory factory; + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @Override public SessionDataStoreFactory createSessionDataStoreFactory() { - HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory = new HazelcastSessionDataStoreFactory(); + factory.setMapName( Long.toString( System.currentTimeMillis() ) ); return factory; } + + @After + public void shutdown() + { + factory.getHazelcastInstance().getMap( factory.getMapName() ).clear(); + } } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java index 2078882ee21..f2549d7b964 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientLastAccessTimeTest.java @@ -32,7 +32,7 @@ public class ClientLastAccessTimeTest extends AbstractClusteredLastAccessTimeTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java index f74d05746c9..effcafdecca 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientModifyMaxInactiveIntervalTest.java @@ -19,9 +19,15 @@ package org.eclipse.jetty.hazelcast.session.client; +import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; import org.eclipse.jetty.hazelcast.session.HazelcastSessionDataStoreFactory; import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest; import org.eclipse.jetty.server.session.SessionDataStoreFactory; +import org.junit.After; +import org.junit.Before; /** * ModifyMaxInactiveIntervalTest @@ -30,6 +36,27 @@ public class ClientModifyMaxInactiveIntervalTest extends AbstractModifyMaxInactiveIntervalTest { + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); + + private HazelcastInstance hazelcastInstance; + + @Before + public void startHazelcast() + throws Exception + { + Config config = new Config().addMapConfig( new MapConfig().setName( MAP_NAME ) ) // + .setInstanceName( "beer" ); + // start Hazelcast instance + hazelcastInstance = Hazelcast.getOrCreateHazelcastInstance( config ); + } + + @After + public void stopHazelcast() + throws Exception + { + hazelcastInstance.shutdown(); + } + /** * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() */ @@ -37,6 +64,8 @@ public class ClientModifyMaxInactiveIntervalTest public SessionDataStoreFactory createSessionDataStoreFactory() { HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); + factory.setOnlyClient( true ); + factory.setMapName( MAP_NAME ); return factory; } diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java index e9b00f20433..4abc6b1cedb 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientNonClusteredSessionScavengingTest.java @@ -60,7 +60,7 @@ public class ClientNonClusteredSessionScavengingTest fail( e.getMessage() ); } } - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java index 00943a5f036..2a08debb557 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientOrphanedSessionTest.java @@ -33,7 +33,7 @@ public class ClientOrphanedSessionTest extends AbstractClusteredOrphanedSessionTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java index 4533b980d8a..a2fc8a24c37 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionExpiryTest.java @@ -33,7 +33,7 @@ public class ClientSessionExpiryTest extends AbstractSessionExpiryTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java index 1219907e689..cedd39abcaa 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionInvalidateCreateScavengeTest.java @@ -32,7 +32,7 @@ import org.junit.Before; public class ClientSessionInvalidateCreateScavengeTest extends AbstractSessionInvalidateCreateScavengeTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java index e5dc2cc1a9a..ba786895c02 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionMigrationTest.java @@ -35,7 +35,7 @@ import org.junit.Before; public class ClientSessionMigrationTest extends AbstractClusteredSessionMigrationTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java index 06a34569462..e7ca25a3877 100644 --- a/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/java/org/eclipse/jetty/hazelcast/session/client/ClientSessionScavengingTest.java @@ -33,7 +33,7 @@ public class ClientSessionScavengingTest extends AbstractClusteredSessionScavengingTest { - private static final String MAP_NAME = "jetty_foo_session"; + private static final String MAP_NAME = Long.toString( System.currentTimeMillis() ); private HazelcastInstance hazelcastInstance; diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/resources/jetty-logging.properties b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/jetty-logging.properties new file mode 100644 index 00000000000..07faa86dbce --- /dev/null +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/jetty-logging.properties @@ -0,0 +1 @@ +org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog \ No newline at end of file diff --git a/tests/test-sessions/test-hazelcast-sessions/src/test/resources/logging.properties b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/logging.properties new file mode 100644 index 00000000000..256dbe49265 --- /dev/null +++ b/tests/test-sessions/test-hazelcast-sessions/src/test/resources/logging.properties @@ -0,0 +1,3 @@ +handlers=java.util.logging.ConsoleHandler +.level=INFO +com.hazelcast.level=SEVERE \ No newline at end of file From e86e8a752c4b3d9d0abe0ed58d271b2a125e3917 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 27 Dec 2017 06:07:32 -0800 Subject: [PATCH 5/5] Issue #1973 - Implement minimum response data rate (#2012) * Code cleanups. Signed-off-by: Simone Bordet * Improved test case handler. Signed-off-by: Simone Bordet * Improved exception message. Signed-off-by: Simone Bordet * Issue #1973 - Implement minimum response data rate. Implemented response content data rate control in HttpOutput. Introduced a WriteFlusher.Listener interface that produces events for every flush(). These events are forwarded to the Connection and from there to the HttpOutput so that the data rate control can be enforced. Both HTTP/1.1 and HTTP/2 are implemented. Data rate control for HTTP/1.1 is approximate because it will count also headers bytes and the chunk bytes, while for HTTP/2 is precise. Signed-off-by: Simone Bordet * Issue #1973 - Implement minimum response data rate. Addressed review comments. Signed-off-by: Simone Bordet --- .../eclipse/jetty/http2/HTTP2Connection.java | 10 +- .../org/eclipse/jetty/http2/HTTP2Flusher.java | 49 ++++- .../org/eclipse/jetty/http2/HTTP2Session.java | 55 +++-- .../http/HttpClientTransportOverHTTP2.java | 5 + .../http2/server/HttpChannelOverHTTP2.java | 9 +- .../org/eclipse/jetty/io/WriteFlusher.java | 204 ++++++++++-------- .../jetty/server/HttpConfiguration.java | 26 ++- .../eclipse/jetty/server/HttpConnection.java | 11 +- .../org/eclipse/jetty/server/HttpInput.java | 2 +- .../org/eclipse/jetty/server/HttpOutput.java | 42 +++- .../jetty/http/client/EmptyServerHandler.java | 11 +- .../jetty/http/client/HttpClientTest.java | 4 +- .../jetty/http/client/ServerTimeoutsTest.java | 74 +++++++ 13 files changed, 385 insertions(+), 117 deletions(-) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 55572146c21..babbbf9fa84 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.http2.parser.Parser; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.LifeCycle; @@ -37,7 +38,7 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; -public class HTTP2Connection extends AbstractConnection +public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener { protected static final Logger LOG = Log.getLogger(HTTP2Connection.class); @@ -176,6 +177,13 @@ public class HTTP2Connection extends AbstractConnection } } + @Override + public void onFlushed(long bytes) throws IOException + { + // TODO: add method to ISession ? + ((HTTP2Session)session).onFlushed(bytes); + } + protected class HTTP2Producer implements ExecutionStrategy.Producer { private final Callback fillableCallback = new FillableCallback(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index b3bb0ccf97a..94452f24ccc 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -175,7 +176,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable { if (entry.generate(lease)) { - if (entry.dataRemaining() > 0) + if (entry.getDataBytesRemaining() > 0) entries.offer(entry); } else @@ -207,6 +208,31 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable return Action.SCHEDULED; } + void onFlushed(long bytes) throws IOException + { + // For the given flushed bytes, we want to only + // forward those that belong to data frame content. + for (Entry entry : actives) + { + int frameBytesLeft = entry.getFrameBytesRemaining(); + if (frameBytesLeft > 0) + { + int update = (int)Math.min(bytes, frameBytesLeft); + entry.onFrameBytesFlushed(update); + bytes -= update; + IStream stream = entry.stream; + if (stream != null && !entry.isControl()) + { + Object channel = stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); + if (channel instanceof WriteFlusher.Listener) + ((WriteFlusher.Listener)channel).onFlushed(update - Frame.HEADER_LENGTH); + } + if (bytes == 0) + break; + } + } + } + @Override public void succeeded() { @@ -234,13 +260,13 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable for (int i = index; i < actives.size(); ++i) { Entry entry = actives.get(i); - if (entry.dataRemaining() > 0) + if (entry.getDataBytesRemaining() > 0) append(entry); } for (int i = 0; i < index; ++i) { Entry entry = actives.get(i); - if (entry.dataRemaining() > 0) + if (entry.getDataBytesRemaining() > 0) append(entry); } stalled = null; @@ -333,7 +359,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable this.stream = stream; } - public int dataRemaining() + public abstract int getFrameBytesRemaining(); + + public abstract void onFrameBytesFlushed(int bytesFlushed); + + public int getDataBytesRemaining() { return 0; } @@ -387,6 +417,17 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable } } + private boolean isControl() + { + switch (frame.getType()) + { + case DATA: + return false; + default: + return true; + } + } + @Override public String toString() { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 32e7c136323..e8b1c317146 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -955,6 +955,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { } + void onFlushed(long bytes) throws IOException + { + flusher.onFlushed(bytes); + } + public void disconnect() { if (LOG.isDebugEnabled()) @@ -1132,15 +1137,28 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private class ControlEntry extends HTTP2Flusher.Entry { private int bytes; + private int frameBytes; private ControlEntry(Frame frame, IStream stream, Callback callback) { super(frame, stream, callback); } + @Override + public int getFrameBytesRemaining() + { + return frameBytes; + } + + @Override + public void onFrameBytesFlushed(int bytesFlushed) + { + frameBytes -= bytesFlushed; + } + protected boolean generate(ByteBufferPool.Lease lease) { - bytes = generator.control(lease, frame); + bytes = frameBytes = generator.control(lease, frame); if (LOG.isDebugEnabled()) LOG.debug("Generated {}", frame); prepare(); @@ -1238,7 +1256,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private class DataEntry extends HTTP2Flusher.Entry { private int bytes; - private int dataRemaining; + private int frameBytes; + private int dataBytes; private int dataWritten; private DataEntry(DataFrame frame, IStream stream, Callback callback) @@ -1249,35 +1268,47 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // of data frames that cannot be completely written due to // the flow control window exhausting, since in that case // we would have to count the padding only once. - dataRemaining = frame.remaining(); + dataBytes = frame.remaining(); } @Override - public int dataRemaining() + public int getFrameBytesRemaining() { - return dataRemaining; + return frameBytes; + } + + @Override + public void onFrameBytesFlushed(int bytesFlushed) + { + frameBytes -= bytesFlushed; + } + + @Override + public int getDataBytesRemaining() + { + return dataBytes; } protected boolean generate(ByteBufferPool.Lease lease) { - int dataRemaining = dataRemaining(); + int dataBytes = getDataBytesRemaining(); int sessionSendWindow = getSendWindow(); int streamSendWindow = stream.updateSendWindow(0); int window = Math.min(streamSendWindow, sessionSendWindow); - if (window <= 0 && dataRemaining > 0) + if (window <= 0 && dataBytes > 0) return false; - int length = Math.min(dataRemaining, window); + int length = Math.min(dataBytes, window); // Only one DATA frame is generated. - bytes = generator.data(lease, (DataFrame)frame, length); + bytes = frameBytes = generator.data(lease, (DataFrame)frame, length); int written = bytes - Frame.HEADER_LENGTH; if (LOG.isDebugEnabled()) - LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataRemaining); + LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes); this.dataWritten = written; - this.dataRemaining -= written; + this.dataBytes -= written; flowControl.onDataSending(stream, written); @@ -1292,7 +1323,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // Do we have more to send ? DataFrame dataFrame = (DataFrame)frame; - if (dataRemaining() == 0) + if (getDataBytesRemaining() == 0) { // Only now we can update the close state // and eventually remove the stream. diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java index 7815d9c946c..8bcc8f8a0f0 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java @@ -62,6 +62,11 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport }); } + public HTTP2Client getHTTP2Client() + { + return client; + } + @ManagedAttribute(value = "The number of selectors", readonly = true) public int getSelectors() { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index 1a8168c53b5..484308bf033 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; @@ -48,7 +49,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable +public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, WriteFlusher.Listener { private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); @@ -85,6 +86,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable return getStream().getIdleTimeout(); } + @Override + public void onFlushed(long bytes) throws IOException + { + getResponse().getHttpOutput().onFlushed(bytes); + } + public Runnable onRequest(HeadersFrame frame) { try diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 7f7c6070051..81a2cf32e2b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; - /** * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling * {@link EndPoint#flush(ByteBuffer...)} until all content is written. @@ -60,7 +59,7 @@ abstract public class WriteFlusher // fill the state machine __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING)); __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); - __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE)); + __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE)); __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE)); } @@ -104,29 +103,30 @@ abstract public class WriteFlusher /** * Tries to update the current state to the given new state. + * * @param previous the expected current state - * @param next the desired new state + * @param next the desired new state * @return the previous state or null if the state transition failed * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error) */ - private boolean updateState(State previous,State next) + private boolean updateState(State previous, State next) { - if (!isTransitionAllowed(previous,next)) + if (!isTransitionAllowed(previous, next)) throw new IllegalStateException(); boolean updated = _state.compareAndSet(previous, next); if (DEBUG) - LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next); + LOG.debug("update {}:{}{}{}", this, previous, updated ? "-->" : "!->", next); return updated; } private void fail(PendingState pending) { State current = _state.get(); - if (current.getType()==StateType.FAILED) + if (current.getType() == StateType.FAILED) { - FailedState failed=(FailedState)current; - if (updateState(failed,__IDLE)) + FailedState failed = (FailedState)current; + if (updateState(failed, __IDLE)) { pending.fail(failed.getCause()); return; @@ -138,9 +138,9 @@ abstract public class WriteFlusher private void ignoreFail() { State current = _state.get(); - while (current.getType()==StateType.FAILED) + while (current.getType() == StateType.FAILED) { - if (updateState(current,__IDLE)) + if (updateState(current, __IDLE)) return; current = _state.get(); } @@ -209,10 +209,11 @@ abstract public class WriteFlusher private static class FailedState extends State { private final Throwable _cause; + private FailedState(Throwable cause) { super(StateType.FAILED); - _cause=cause; + _cause = cause; } public Throwable getCause() @@ -257,7 +258,7 @@ abstract public class WriteFlusher protected boolean fail(Throwable cause) { - if (_callback!=null) + if (_callback != null) { _callback.failed(cause); return true; @@ -267,7 +268,7 @@ abstract public class WriteFlusher protected void complete() { - if (_callback!=null) + if (_callback != null) _callback.succeeded(); } @@ -286,8 +287,8 @@ abstract public class WriteFlusher { State s = _state.get(); return (s instanceof PendingState) - ?((PendingState)s).getCallbackInvocationType() - :Invocable.InvocationType.BLOCKING; + ? ((PendingState)s).getCallbackInvocationType() + : Invocable.InvocationType.BLOCKING; } /** @@ -300,13 +301,13 @@ abstract public class WriteFlusher * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition * fails it'll fail the callback. * - * If not all buffers can be written in one go it creates a new PendingState object to preserve the state + * If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state * and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}. * * If all buffers have been written it calls callback.complete(). * * @param callback the callback to call on either failed or complete - * @param buffers the buffers to flush to the endpoint + * @param buffers the buffers to flush to the endpoint * @throws WritePendingException if unable to write due to prior pending write */ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException @@ -314,20 +315,20 @@ abstract public class WriteFlusher if (DEBUG) LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers)); - if (!updateState(__IDLE,__WRITING)) + if (!updateState(__IDLE, __WRITING)) throw new WritePendingException(); try { - buffers=flush(buffers); + buffers = flush(buffers); // if we are incomplete? - if (buffers!=null) + if (buffers != null) { if (DEBUG) LOG.debug("flushed incomplete"); - PendingState pending=new PendingState(buffers, callback); - if (updateState(__WRITING,pending)) + PendingState pending = new PendingState(buffers, callback); + if (updateState(__WRITING, pending)) onIncompleteFlush(); else fail(pending); @@ -335,18 +336,18 @@ abstract public class WriteFlusher } // If updateState didn't succeed, we don't care as our buffers have been written - if (!updateState(__WRITING,__IDLE)) + if (!updateState(__WRITING, __IDLE)) ignoreFail(); - if (callback!=null) + if (callback != null) callback.succeeded(); } catch (IOException e) { if (DEBUG) LOG.debug("write exception", e); - if (updateState(__WRITING,__IDLE)) + if (updateState(__WRITING, __IDLE)) { - if (callback!=null) + if (callback != null) callback.failed(e); } else @@ -354,7 +355,6 @@ abstract public class WriteFlusher } } - /** * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress. @@ -370,27 +370,27 @@ abstract public class WriteFlusher State previous = _state.get(); - if (previous.getType()!=StateType.PENDING) + if (previous.getType() != StateType.PENDING) return; // failure already handled. PendingState pending = (PendingState)previous; - if (!updateState(pending,__COMPLETING)) + if (!updateState(pending, __COMPLETING)) return; // failure already handled. try { ByteBuffer[] buffers = pending.getBuffers(); - buffers=flush(buffers); + buffers = flush(buffers); // if we are incomplete? - if (buffers!=null) + if (buffers != null) { if (DEBUG) - LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers)); - if (buffers!=pending.getBuffers()) - pending=new PendingState(buffers, pending._callback); - if (updateState(__COMPLETING,pending)) + LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers)); + if (buffers != pending.getBuffers()) + pending = new PendingState(buffers, pending._callback); + if (updateState(__COMPLETING, pending)) onIncompleteFlush(); else fail(pending); @@ -398,7 +398,7 @@ abstract public class WriteFlusher } // If updateState didn't succeed, we don't care as our buffers have been written - if (!updateState(__COMPLETING,__IDLE)) + if (!updateState(__COMPLETING, __IDLE)) ignoreFail(); pending.complete(); } @@ -406,7 +406,7 @@ abstract public class WriteFlusher { if (DEBUG) LOG.debug("completeWrite exception", e); - if(updateState(__COMPLETING,__IDLE)) + if (updateState(__COMPLETING, __IDLE)) pending.fail(e); else fail(pending); @@ -422,59 +422,84 @@ abstract public class WriteFlusher */ protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException { - boolean progress=true; - while(progress && buffers!=null) + boolean progress = true; + while (progress && buffers != null) { - int before=buffers.length==0?0:buffers[0].remaining(); - boolean flushed=_endPoint.flush(buffers); - int r=buffers.length==0?0:buffers[0].remaining(); + long before = remaining(buffers); + boolean flushed = _endPoint.flush(buffers); + long after = remaining(buffers); + long written = before - after; if (LOG.isDebugEnabled()) - LOG.debug("Flushed={} {}/{}+{} {}",flushed,before-r,before,buffers.length-1,this); + LOG.debug("Flushed={} written={} remaining={} {}", flushed, written, after, this); + + if (written > 0) + { + Connection connection = _endPoint.getConnection(); + if (connection instanceof Listener) + ((Listener)connection).onFlushed(written); + } if (flushed) return null; - progress=before!=r; + progress = written > 0; - int not_empty=0; - while(r==0) + int index = 0; + while (true) { - if (++not_empty==buffers.length) + if (index == buffers.length) { - buffers=null; - not_empty=0; + // All buffers consumed. + buffers = null; + index = 0; break; } - progress=true; - r=buffers[not_empty].remaining(); + else + { + int remaining = buffers[index].remaining(); + if (remaining > 0) + break; + ++index; + progress = true; + } } - - if (not_empty>0) - buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length); + if (index > 0) + buffers = Arrays.copyOfRange(buffers, index, buffers.length); } if (LOG.isDebugEnabled()) - LOG.debug("!fully flushed {}",this); + LOG.debug("!fully flushed {}", this); // If buffers is null, then flush has returned false but has consumed all the data! // This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS // and that will keep this WriteFlusher pending. - return buffers==null?EMPTY_BUFFERS:buffers; + return buffers == null ? EMPTY_BUFFERS : buffers; } - /* ------------------------------------------------------------ */ - /** Notify the flusher of a failure + private long remaining(ByteBuffer[] buffers) + { + if (buffers == null) + return 0; + long result = 0; + for (ByteBuffer buffer : buffers) + result += buffer.remaining(); + return result; + } + + /** + * Notify the flusher of a failure + * * @param cause The cause of the failure * @return true if the flusher passed the failure to a {@link Callback} instance */ public boolean onFail(Throwable cause) { // Keep trying to handle the failure until we get to IDLE or FAILED state - while(true) + while (true) { - State current=_state.get(); - switch(current.getType()) + State current = _state.get(); + switch (current.getType()) { case IDLE: case FAILED: @@ -487,7 +512,7 @@ abstract public class WriteFlusher LOG.debug("failed: " + this, cause); PendingState pending = (PendingState)current; - if (updateState(pending,__IDLE)) + if (updateState(pending, __IDLE)) return pending.fail(cause); break; @@ -495,7 +520,7 @@ abstract public class WriteFlusher if (DEBUG) LOG.debug("failed: " + this, cause); - if (updateState(current,new FailedState(cause))) + if (updateState(current, new FailedState(cause))) return false; break; } @@ -512,29 +537,9 @@ abstract public class WriteFlusher return _state.get().getType() == StateType.IDLE; } - public boolean isInProgress() - { - switch(_state.get().getType()) - { - case WRITING: - case PENDING: - case COMPLETING: - return true; - default: - return false; - } - } - - @Override - public String toString() - { - State s = _state.get(); - return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s,s instanceof PendingState?((PendingState)s).getCallback():null); - } - public String toStateString() { - switch(_state.get().getType()) + switch (_state.get().getType()) { case WRITING: return "W"; @@ -550,4 +555,33 @@ abstract public class WriteFlusher return "?"; } } + + @Override + public String toString() + { + State s = _state.get(); + return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s).getCallback() : null); + } + + /** + *

A listener of {@link WriteFlusher} events.

+ */ + public interface Listener + { + /** + *

Invoked when a {@link WriteFlusher} flushed bytes in a non-blocking way, + * as part of a - possibly larger - write.

+ *

This method may be invoked multiple times, for example when writing a large + * buffer: a first flush of bytes, then the connection became TCP congested, and + * a subsequent flush of bytes when the connection became writable again.

+ *

This method is never invoked concurrently, but may be invoked by different + * threads, so implementations may not rely on thread-local variables.

+ *

Implementations may throw an {@link IOException} to signal that the write + * should fail, for example if the implementation enforces a minimum data rate.

+ * + * @param bytes the number of bytes flushed + * @throws IOException if the write should fail + */ + void onFlushed(long bytes) throws IOException; + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java index e39da219bd6..37566401a15 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java @@ -66,6 +66,7 @@ public class HttpConfiguration private boolean _persistentConnectionsEnabled = true; private int _maxErrorDispatches = 10; private long _minRequestDataRate; + private long _minResponseDataRate; private CookieCompliance _cookieCompliance = CookieCompliance.RFC6265; private boolean _notifyRemoteAsyncErrors = true; @@ -127,6 +128,7 @@ public class HttpConfiguration _persistentConnectionsEnabled=config._persistentConnectionsEnabled; _maxErrorDispatches=config._maxErrorDispatches; _minRequestDataRate=config._minRequestDataRate; + _minResponseDataRate=config._minResponseDataRate; _cookieCompliance=config._cookieCompliance; _notifyRemoteAsyncErrors=config._notifyRemoteAsyncErrors; } @@ -496,7 +498,29 @@ public class HttpConfiguration { _minRequestDataRate=bytesPerSecond; } - + + /** + * @return The minimum response data rate in bytes per second; or <=0 for no limit + */ + @ManagedAttribute("The minimum response content data rate in bytes per second") + public long getMinResponseDataRate() + { + return _minResponseDataRate; + } + + /** + *

Sets an minimum response content data rate.

+ *

The value is enforced only approximately - not precisely - due to the fact that + * for efficiency reasons buffer writes may be comprised of both response headers and + * response content.

+ * + * @param bytesPerSecond The minimum response data rate in bytes per second; or <=0 for no limit + */ + public void setMinResponseDataRate(long bytesPerSecond) + { + _minResponseDataRate = bytesPerSecond; + } + public CookieCompliance getCookieCompliance() { return _cookieCompliance; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index b7f51632dd7..f1d11f2db66 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -40,6 +40,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; @@ -49,7 +50,7 @@ import org.eclipse.jetty.util.log.Logger; /** *

A {@link Connection} that handles the HTTP protocol.

*/ -public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom +public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom, WriteFlusher.Listener { private static final Logger LOG = Log.getLogger(HttpConnection.class); public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString()); @@ -195,6 +196,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http return null; } + @Override + public void onFlushed(long bytes) throws IOException + { + // Unfortunately cannot distinguish between header and content + // bytes, and for content bytes whether they are chunked or not. + _channel.getResponse().getHttpOutput().onFlushed(bytes); + } + void releaseRequestBuffer() { if (_requestBuffer != null && !_requestBuffer.hasRemaining()) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 47f9655430d..504ab78d062 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -282,7 +282,7 @@ public class HttpInput extends ServletInputStream implements Runnable { long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); if (_contentArrived < minimum_data) - throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate)); + throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request content data rate < %d B/s",minRequestDataRate)); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 7c6691a7c01..ba07598ad7b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritePendingException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; @@ -122,12 +123,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable private final HttpChannel _channel; private final SharedBlockingCallback _writeBlocker; private Interceptor _interceptor; - - /** - * Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. - */ private long _written; - + private long _flushed; + private long _firstByteTimeStamp = -1; private ByteBuffer _aggregate; private int _bufferSize; private int _commitSize; @@ -231,6 +229,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable protected void write(ByteBuffer content, boolean complete, Callback callback) { + if (_firstByteTimeStamp == -1) + { + long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate(); + if (minDataRate > 0) + _firstByteTimeStamp = System.nanoTime(); + else + _firstByteTimeStamp = Long.MAX_VALUE; + } _interceptor.write(content, complete, callback); } @@ -908,6 +914,30 @@ public class HttpOutput extends ServletOutputStream implements Runnable _commitSize = size; } + /** + *

Invoked when bytes have been flushed to the network.

+ *

The number of flushed bytes may be different from the bytes written + * by the application if an {@link Interceptor} changed them, for example + * by compressing them.

+ * + * @param bytes the number of bytes flushed + * @throws IOException if the minimum data rate, when set, is not respected + * @see org.eclipse.jetty.io.WriteFlusher.Listener + */ + public void onFlushed(long bytes) throws IOException + { + if (_firstByteTimeStamp == -1 || _firstByteTimeStamp == Long.MAX_VALUE) + return; + long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate(); + _flushed += bytes; + long elapsed = System.nanoTime() - _firstByteTimeStamp; + long minFlushed = minDataRate * TimeUnit.NANOSECONDS.toMillis(elapsed) / TimeUnit.SECONDS.toMillis(1); + if (LOG.isDebugEnabled()) + LOG.debug("Flushed bytes min/actual {}/{}", minFlushed, _flushed); + if (_flushed < minFlushed) + throw new IOException(String.format("Response content data rate < %d B/s", minDataRate)); + } + public void recycle() { _interceptor = _channel; @@ -920,6 +950,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable _written = 0; _writeListener = null; _onError = null; + _firstByteTimeStamp = -1; + _flushed = 0; reopen(); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java index eab1eedf66a..87f1315671c 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/EmptyServerHandler.java @@ -27,11 +27,16 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; -public class EmptyServerHandler extends AbstractHandler +public class EmptyServerHandler extends AbstractHandler.ErrorDispatchHandler { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + jettyRequest.setHandled(true); + service(target, jettyRequest, request, response); + } + + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - baseRequest.setHandled(true); } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java index b7a020d59e2..409254e6711 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTest.java @@ -37,7 +37,6 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.util.BytesContentProvider; @@ -497,9 +496,8 @@ public class HttpClientTest extends AbstractTest start(new EmptyServerHandler() { @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - super.handle(target, baseRequest, request, response); response.getWriter().write("Jetty"); } }); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java index f7c93c4837a..7acd68c5a95 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java @@ -43,6 +43,8 @@ import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Request; @@ -50,7 +52,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.StacklessLogging; +import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; public class ServerTimeoutsTest extends AbstractTest @@ -736,6 +740,76 @@ public class ServerTimeoutsTest extends AbstractTest Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testBlockingWriteWithMinimumDataRateBelowLimit() throws Exception + { + // This test needs a large write to stall the server, and a slow reading client. + // In HTTP/1.1, when using the loopback interface, the buffers are so large that + // it would require a very large write (32 MiB) and a lot of time for this test + // to pass. On the first writes, the server fills in the large buffers with a lot + // of bytes (about 4 MiB), and so it would take a lot of time for the client to + // read those bytes and eventually produce a write rate that will make the server + // fail; and the write should be large enough to _not_ complete before the rate + // is below the minimum. + // In HTTP/2, we force the flow control window to be small, so that the server + // stalls almost immediately without having written many bytes, so that the test + // completes quickly. + Assume.assumeThat(transport, Matchers.isOneOf(Transport.H2, Transport.H2C)); + + int bytesPerSecond = 16 * 1024; + httpConfig.setMinResponseDataRate(bytesPerSecond); + CountDownLatch serverLatch = new CountDownLatch(1); + start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + ServletOutputStream output = response.getOutputStream(); + output.write(new byte[8 * 1024 * 1024]); + } + catch (IOException x) + { + serverLatch.countDown(); + } + } + }); + ((HttpClientTransportOverHTTP2)client.getTransport()).getHTTP2Client().setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + + // Setup the client to read slower than the min data rate. + BlockingQueue objects = new LinkedBlockingQueue<>(); + CountDownLatch clientLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .onResponseContentAsync((response, content, callback) -> + { + objects.offer(content.remaining()); + objects.offer(callback); + }) + .send(result -> + { + objects.offer(-1); + objects.offer(Callback.NOOP); + if (result.isFailed()) + clientLatch.countDown(); + }); + + long readRate = bytesPerSecond / 2; + while (true) + { + int bytes = (Integer)objects.poll(5, TimeUnit.SECONDS); + if (bytes < 0) + break; + long ms = bytes * 1000L / readRate; + Thread.sleep(ms); + Callback callback = (Callback)objects.poll(); + callback.succeeded(); + } + + Assert.assertTrue(serverLatch.await(15, TimeUnit.SECONDS)); + Assert.assertTrue(clientLatch.await(15, TimeUnit.SECONDS)); + } + private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler { private final CountDownLatch handlerLatch;