Merge remote-tracking branch 'origin/jetty-9.4.x'

This commit is contained in:
Greg Wilkins 2017-12-27 16:04:16 +01:00
commit 12647f51e7
43 changed files with 771 additions and 223 deletions

View File

@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
@ -86,7 +87,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
{ {
HttpRequest request = (HttpRequest)result.getRequest(); HttpRequest request = (HttpRequest)result.getRequest();
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding()); ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding());
if (result.isFailed()) if (result.getResponseFailure() != null)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Authentication challenge failed {}", result.getFailure()); LOG.debug("Authentication challenge failed {}", result.getFailure());
@ -98,7 +99,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
HttpConversation conversation = request.getConversation(); HttpConversation conversation = request.getConversation();
if (conversation.getAttribute(authenticationAttribute) != null) if (conversation.getAttribute(authenticationAttribute) != null)
{ {
// We have already tried to authenticate, but we failed again // We have already tried to authenticate, but we failed again.
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Bad credentials for {}", request); LOG.debug("Bad credentials for {}", request);
forwardSuccessComplete(request, response); forwardSuccessComplete(request, response);
@ -111,7 +112,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Authentication challenge without {} header", header); LOG.debug("Authentication challenge without {} header", header);
forwardFailureComplete(request, null, response, new HttpResponseException("HTTP protocol violation: Authentication challenge without " + header + " header", response)); forwardFailureComplete(request, result.getRequestFailure(), response, new HttpResponseException("HTTP protocol violation: Authentication challenge without " + header + " header", response));
return; return;
} }
@ -138,9 +139,18 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
return; return;
} }
ContentProvider requestContent = request.getContent();
if (requestContent != null && !requestContent.isReproducible())
{
if (LOG.isDebugEnabled())
LOG.debug("Request content not reproducible for {}", request);
forwardSuccessComplete(request, response);
return;
}
try try
{ {
final Authentication.Result authnResult = authentication.authenticate(request, response, headerInfo, conversation); Authentication.Result authnResult = authentication.authenticate(request, response, headerInfo, conversation);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Authentication result {}", authnResult); LOG.debug("Authentication result {}", authnResult);
if (authnResult == null) if (authnResult == null)

View File

@ -22,6 +22,7 @@ import java.io.Closeable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.ByteBufferContentProvider; import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.client.util.PathContentProvider; import org.eclipse.jetty.client.util.PathContentProvider;
@ -48,6 +49,22 @@ public interface ContentProvider extends Iterable<ByteBuffer>
*/ */
long getLength(); long getLength();
/**
* <p>Whether this ContentProvider can produce exactly the same content more
* than once.</p>
* <p>Implementations should return {@code true} only if the content can be
* produced more than once, which means that invocations to {@link #iterator()}
* must return a new, independent, iterator instance over the content.</p>
* <p>The {@link HttpClient} implementation may use this method in particular
* cases where it detects that it is safe to retry a request that failed.</p>
*
* @return whether the content can be produced more than once
*/
default boolean isReproducible()
{
return false;
}
/** /**
* An extension of {@link ContentProvider} that provides a content type string * An extension of {@link ContentProvider} that provides a content type string
* to be used as a {@code Content-Type} HTTP header in requests. * to be used as a {@code Content-Type} HTTP header in requests.

View File

@ -57,6 +57,12 @@ public class ByteBufferContentProvider extends AbstractTypedContentProvider
return length; return length;
} }
@Override
public boolean isReproducible()
{
return true;
}
@Override @Override
public Iterator<ByteBuffer> iterator() public Iterator<ByteBuffer> iterator()
{ {
@ -85,12 +91,6 @@ public class ByteBufferContentProvider extends AbstractTypedContentProvider
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
} }
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}; };
} }
} }

View File

@ -53,6 +53,12 @@ public class BytesContentProvider extends AbstractTypedContentProvider
return length; return length;
} }
@Override
public boolean isReproducible()
{
return true;
}
@Override @Override
public Iterator<ByteBuffer> iterator() public Iterator<ByteBuffer> iterator()
{ {
@ -78,12 +84,6 @@ public class BytesContentProvider extends AbstractTypedContentProvider
throw new NoSuchElementException(); throw new NoSuchElementException();
} }
} }
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}; };
} }
} }

View File

@ -85,6 +85,12 @@ public class PathContentProvider extends AbstractTypedContentProvider
return fileSize; return fileSize;
} }
@Override
public boolean isReproducible()
{
return true;
}
public ByteBufferPool getByteBufferPool() public ByteBufferPool getByteBufferPool()
{ {
return bufferPool; return bufferPool;

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
@ -96,6 +97,7 @@ public abstract class AbstractHttpClientServerTest
clientThreads.setName("client"); clientThreads.setName("client");
client = new HttpClient(transport, sslContextFactory); client = new HttpClient(transport, sslContextFactory);
client.setExecutor(clientThreads); client.setExecutor(clientThreads);
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
client.start(); client.start();
} }

View File

@ -22,11 +22,14 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntFunction;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -34,6 +37,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.AuthenticationStore; import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
@ -41,6 +45,7 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BasicAuthentication; import org.eclipse.jetty.client.util.BasicAuthentication;
import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.DigestAuthentication; import org.eclipse.jetty.client.util.DigestAuthentication;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.security.Authenticator; import org.eclipse.jetty.security.Authenticator;
import org.eclipse.jetty.security.ConstraintMapping; import org.eclipse.jetty.security.ConstraintMapping;
import org.eclipse.jetty.security.ConstraintSecurityHandler; import org.eclipse.jetty.security.ConstraintSecurityHandler;
@ -220,7 +225,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
{ {
baseRequest.setHandled(true); baseRequest.setHandled(true);
if (requests.incrementAndGet() == 1) if (requests.incrementAndGet() == 1)
response.sendRedirect(URIUtil.newURI(scheme,request.getServerName(),request.getServerPort(),request.getRequestURI(),null)); response.sendRedirect(URIUtil.newURI(scheme, request.getServerName(), request.getServerPort(), request.getRequestURI(), null));
} }
}); });
@ -259,7 +264,7 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
{ {
baseRequest.setHandled(true); baseRequest.setHandled(true);
if (request.getRequestURI().endsWith("/redirect")) if (request.getRequestURI().endsWith("/redirect"))
response.sendRedirect(URIUtil.newURI(scheme,request.getServerName(),request.getServerPort(),"/secure",null)); response.sendRedirect(URIUtil.newURI(scheme, request.getServerName(), request.getServerPort(), "/secure", null));
} }
}); });
@ -424,6 +429,40 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
Assert.assertEquals(1, requests.get()); Assert.assertEquals(1, requests.get());
} }
@Test
public void test_NonReproducibleContent() throws Exception
{
startBasic(new EmptyServerHandler());
AuthenticationStore authenticationStore = client.getAuthenticationStore();
URI uri = URI.create(scheme + "://localhost:" + connector.getLocalPort());
BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic");
authenticationStore.addAuthentication(authentication);
CountDownLatch resultLatch = new CountDownLatch(1);
byte[] data = new byte[]{'h', 'e', 'l', 'l', 'o'};
DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(data))
{
@Override
public boolean isReproducible()
{
return false;
}
};
Request request = client.newRequest(uri)
.path("/secure")
.content(content);
request.send(result ->
{
if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.UNAUTHORIZED_401)
resultLatch.countDown();
});
content.close();
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
@Test @Test
public void test_RequestFailsAfterResponse() throws Exception public void test_RequestFailsAfterResponse() throws Exception
{ {
@ -434,32 +473,111 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic"); BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic");
authenticationStore.addAuthentication(authentication); authenticationStore.addAuthentication(authentication);
CountDownLatch successLatch = new CountDownLatch(1); AtomicBoolean fail = new AtomicBoolean(true);
GeneratingContentProvider content = new GeneratingContentProvider(index ->
{
switch (index)
{
case 0:
return ByteBuffer.wrap(new byte[]{'h', 'e', 'l', 'l', 'o'});
case 1:
return ByteBuffer.wrap(new byte[]{'w', 'o', 'r', 'l', 'd'});
case 2:
if (fail.compareAndSet(true, false))
{
// Wait for the 401 response to arrive
// to the authentication protocol handler.
sleep(1000);
// Trigger request failure.
throw new RuntimeException();
}
else
{
return null;
}
default:
throw new IllegalStateException();
}
});
CountDownLatch resultLatch = new CountDownLatch(1); CountDownLatch resultLatch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider(); client.newRequest("localhost", connector.getLocalPort())
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.path("/secure") .path("/secure")
.content(content) .content(content)
.onResponseSuccess(response -> successLatch.countDown()); .send(result ->
request.send(result -> {
{ if (result.isSucceeded() && result.getResponse().getStatus() == HttpStatus.OK_200)
if (result.isFailed() && result.getResponseFailure() == null) resultLatch.countDown();
resultLatch.countDown(); });
});
// Send some content to make sure the request is dispatched on the server.
content.offer(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)));
// Wait for the response to arrive to
// the authentication protocol handler.
Thread.sleep(1000);
// Trigger request failure.
request.abort(new Exception());
// Verify that the response was successful, it's the request that failed.
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
} }
private void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
private static class GeneratingContentProvider implements ContentProvider
{
private static final ByteBuffer DONE = ByteBuffer.allocate(0);
private final IntFunction<ByteBuffer> generator;
private GeneratingContentProvider(IntFunction<ByteBuffer> generator)
{
this.generator = generator;
}
@Override
public long getLength()
{
return -1;
}
@Override
public boolean isReproducible()
{
return true;
}
@Override
public Iterator<ByteBuffer> iterator()
{
return new Iterator<ByteBuffer>()
{
private int index;
public ByteBuffer current;
@Override
public boolean hasNext()
{
if (current == null)
{
current = generator.apply(index++);
if (current == null)
current = DONE;
}
return current != DONE;
}
@Override
public ByteBuffer next()
{
ByteBuffer result = current;
current = null;
if (result == null)
throw new NoSuchElementException();
return result;
}
};
}
}
} }

View File

@ -17,7 +17,7 @@
[[jetty-websocket-api-send-message]] [[jetty-websocket-api-send-message]]
=== Send Messages to Remote Endpoint === Send Messages to Remote Endpoint
The most important feature of the Session is access to the link:{JDURL}/org/eclipse/jetty/websocket/api/RemoteEndpoint.html[`org.eclipse.jetty.websocket.api.RemoteEndpoint`]needed to send messages. The most important feature of the Session is access to the link:{JDURL}/org/eclipse/jetty/websocket/api/RemoteEndpoint.html[`org.eclipse.jetty.websocket.api.RemoteEndpoint`] needed to send messages.
With RemoteEndpoint you can choose to send TEXT or BINARY WebSocket messages, or the WebSocket PING and PONG control frames. With RemoteEndpoint you can choose to send TEXT or BINARY WebSocket messages, or the WebSocket PING and PONG control frames.

View File

@ -44,7 +44,7 @@ What was in the Upgrade Request and Response.
UpgradeRequest req = session.getUpgradeRequest(); UpgradeRequest req = session.getUpgradeRequest();
String channelName = req.getParameterMap().get("channelName"); String channelName = req.getParameterMap().get("channelName");
UpgradeRespons resp = session.getUpgradeResponse(); UpgradeResponse resp = session.getUpgradeResponse();
String subprotocol = resp.getAcceptedSubProtocol(); String subprotocol = resp.getAcceptedSubProtocol();
---- ----

View File

@ -121,6 +121,11 @@ public class HazelcastSessionDataStoreFactory
return onlyClient; return onlyClient;
} }
/**
*
* @param onlyClient if <code>true</code> the session manager will only connect to an external Hazelcast instance
* and not use this JVM to start an Hazelcast instance
*/
public void setOnlyClient( boolean onlyClient ) public void setOnlyClient( boolean onlyClient )
{ {
this.onlyClient = onlyClient; this.onlyClient = onlyClient;

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.component.LifeCycle;
@ -37,7 +38,7 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill;
public class HTTP2Connection extends AbstractConnection public class HTTP2Connection extends AbstractConnection implements WriteFlusher.Listener
{ {
protected static final Logger LOG = Log.getLogger(HTTP2Connection.class); protected static final Logger LOG = Log.getLogger(HTTP2Connection.class);
@ -176,6 +177,13 @@ public class HTTP2Connection extends AbstractConnection
} }
} }
@Override
public void onFlushed(long bytes) throws IOException
{
// TODO: add method to ISession ?
((HTTP2Session)session).onFlushed(bytes);
}
protected class HTTP2Producer implements ExecutionStrategy.Producer protected class HTTP2Producer implements ExecutionStrategy.Producer
{ {
private final Callback fillableCallback = new FillableCallback(); private final Callback fillableCallback = new FillableCallback();

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -175,7 +176,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
{ {
if (entry.generate(lease)) if (entry.generate(lease))
{ {
if (entry.dataRemaining() > 0) if (entry.getDataBytesRemaining() > 0)
entries.offer(entry); entries.offer(entry);
} }
else else
@ -207,6 +208,31 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
return Action.SCHEDULED; return Action.SCHEDULED;
} }
void onFlushed(long bytes) throws IOException
{
// For the given flushed bytes, we want to only
// forward those that belong to data frame content.
for (Entry entry : actives)
{
int frameBytesLeft = entry.getFrameBytesRemaining();
if (frameBytesLeft > 0)
{
int update = (int)Math.min(bytes, frameBytesLeft);
entry.onFrameBytesFlushed(update);
bytes -= update;
IStream stream = entry.stream;
if (stream != null && !entry.isControl())
{
Object channel = stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel instanceof WriteFlusher.Listener)
((WriteFlusher.Listener)channel).onFlushed(update - Frame.HEADER_LENGTH);
}
if (bytes == 0)
break;
}
}
}
@Override @Override
public void succeeded() public void succeeded()
{ {
@ -234,13 +260,13 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
for (int i = index; i < actives.size(); ++i) for (int i = index; i < actives.size(); ++i)
{ {
Entry entry = actives.get(i); Entry entry = actives.get(i);
if (entry.dataRemaining() > 0) if (entry.getDataBytesRemaining() > 0)
append(entry); append(entry);
} }
for (int i = 0; i < index; ++i) for (int i = 0; i < index; ++i)
{ {
Entry entry = actives.get(i); Entry entry = actives.get(i);
if (entry.dataRemaining() > 0) if (entry.getDataBytesRemaining() > 0)
append(entry); append(entry);
} }
stalled = null; stalled = null;
@ -333,7 +359,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
this.stream = stream; this.stream = stream;
} }
public int dataRemaining() public abstract int getFrameBytesRemaining();
public abstract void onFrameBytesFlushed(int bytesFlushed);
public int getDataBytesRemaining()
{ {
return 0; return 0;
} }
@ -387,6 +417,17 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
} }
} }
private boolean isControl()
{
switch (frame.getType())
{
case DATA:
return false;
default:
return true;
}
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -955,6 +955,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{ {
} }
void onFlushed(long bytes) throws IOException
{
flusher.onFlushed(bytes);
}
public void disconnect() public void disconnect()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -1132,15 +1137,28 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class ControlEntry extends HTTP2Flusher.Entry private class ControlEntry extends HTTP2Flusher.Entry
{ {
private int bytes; private int bytes;
private int frameBytes;
private ControlEntry(Frame frame, IStream stream, Callback callback) private ControlEntry(Frame frame, IStream stream, Callback callback)
{ {
super(frame, stream, callback); super(frame, stream, callback);
} }
@Override
public int getFrameBytesRemaining()
{
return frameBytes;
}
@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}
protected boolean generate(ByteBufferPool.Lease lease) protected boolean generate(ByteBufferPool.Lease lease)
{ {
bytes = generator.control(lease, frame); bytes = frameBytes = generator.control(lease, frame);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame); LOG.debug("Generated {}", frame);
prepare(); prepare();
@ -1238,7 +1256,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class DataEntry extends HTTP2Flusher.Entry private class DataEntry extends HTTP2Flusher.Entry
{ {
private int bytes; private int bytes;
private int dataRemaining; private int frameBytes;
private int dataBytes;
private int dataWritten; private int dataWritten;
private DataEntry(DataFrame frame, IStream stream, Callback callback) private DataEntry(DataFrame frame, IStream stream, Callback callback)
@ -1249,35 +1268,47 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// of data frames that cannot be completely written due to // of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case // the flow control window exhausting, since in that case
// we would have to count the padding only once. // we would have to count the padding only once.
dataRemaining = frame.remaining(); dataBytes = frame.remaining();
} }
@Override @Override
public int dataRemaining() public int getFrameBytesRemaining()
{ {
return dataRemaining; return frameBytes;
}
@Override
public void onFrameBytesFlushed(int bytesFlushed)
{
frameBytes -= bytesFlushed;
}
@Override
public int getDataBytesRemaining()
{
return dataBytes;
} }
protected boolean generate(ByteBufferPool.Lease lease) protected boolean generate(ByteBufferPool.Lease lease)
{ {
int dataRemaining = dataRemaining(); int dataBytes = getDataBytesRemaining();
int sessionSendWindow = getSendWindow(); int sessionSendWindow = getSendWindow();
int streamSendWindow = stream.updateSendWindow(0); int streamSendWindow = stream.updateSendWindow(0);
int window = Math.min(streamSendWindow, sessionSendWindow); int window = Math.min(streamSendWindow, sessionSendWindow);
if (window <= 0 && dataRemaining > 0) if (window <= 0 && dataBytes > 0)
return false; return false;
int length = Math.min(dataRemaining, window); int length = Math.min(dataBytes, window);
// Only one DATA frame is generated. // Only one DATA frame is generated.
bytes = generator.data(lease, (DataFrame)frame, length); bytes = frameBytes = generator.data(lease, (DataFrame)frame, length);
int written = bytes - Frame.HEADER_LENGTH; int written = bytes - Frame.HEADER_LENGTH;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataRemaining); LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes);
this.dataWritten = written; this.dataWritten = written;
this.dataRemaining -= written; this.dataBytes -= written;
flowControl.onDataSending(stream, written); flowControl.onDataSending(stream, written);
@ -1292,7 +1323,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// Do we have more to send ? // Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame; DataFrame dataFrame = (DataFrame)frame;
if (dataRemaining() == 0) if (getDataBytesRemaining() == 0)
{ {
// Only now we can update the close state // Only now we can update the close state
// and eventually remove the stream. // and eventually remove the stream.

View File

@ -62,6 +62,11 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
}); });
} }
public HTTP2Client getHTTP2Client()
{
return client;
}
@ManagedAttribute(value = "The number of selectors", readonly = true) @ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors() public int getSelectors()
{ {

View File

@ -38,6 +38,7 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
@ -48,7 +49,7 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, WriteFlusher.Listener
{ {
private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class); private static final Logger LOG = Log.getLogger(HttpChannelOverHTTP2.class);
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
@ -85,6 +86,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable
return getStream().getIdleTimeout(); return getStream().getIdleTimeout();
} }
@Override
public void onFlushed(long bytes) throws IOException
{
getResponse().getHttpOutput().onFlushed(bytes);
}
public Runnable onRequest(HeadersFrame frame) public Runnable onRequest(HeadersFrame frame)
{ {
try try

View File

@ -35,7 +35,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Invocable.InvocationType;
/** /**
* A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling * A Utility class to help implement {@link EndPoint#write(Callback, ByteBuffer...)} by calling
* {@link EndPoint#flush(ByteBuffer...)} until all content is written. * {@link EndPoint#flush(ByteBuffer...)} until all content is written.
@ -60,7 +59,7 @@ abstract public class WriteFlusher
// fill the state machine // fill the state machine
__stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING)); __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING));
__stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING,StateType.IDLE)); __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE));
__stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED));
__stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE)); __stateTransitions.put(StateType.FAILED, EnumSet.of(StateType.IDLE));
} }
@ -104,29 +103,30 @@ abstract public class WriteFlusher
/** /**
* Tries to update the current state to the given new state. * Tries to update the current state to the given new state.
*
* @param previous the expected current state * @param previous the expected current state
* @param next the desired new state * @param next the desired new state
* @return the previous state or null if the state transition failed * @return the previous state or null if the state transition failed
* @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error) * @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
*/ */
private boolean updateState(State previous,State next) private boolean updateState(State previous, State next)
{ {
if (!isTransitionAllowed(previous,next)) if (!isTransitionAllowed(previous, next))
throw new IllegalStateException(); throw new IllegalStateException();
boolean updated = _state.compareAndSet(previous, next); boolean updated = _state.compareAndSet(previous, next);
if (DEBUG) if (DEBUG)
LOG.debug("update {}:{}{}{}", this, previous, updated?"-->":"!->",next); LOG.debug("update {}:{}{}{}", this, previous, updated ? "-->" : "!->", next);
return updated; return updated;
} }
private void fail(PendingState pending) private void fail(PendingState pending)
{ {
State current = _state.get(); State current = _state.get();
if (current.getType()==StateType.FAILED) if (current.getType() == StateType.FAILED)
{ {
FailedState failed=(FailedState)current; FailedState failed = (FailedState)current;
if (updateState(failed,__IDLE)) if (updateState(failed, __IDLE))
{ {
pending.fail(failed.getCause()); pending.fail(failed.getCause());
return; return;
@ -138,9 +138,9 @@ abstract public class WriteFlusher
private void ignoreFail() private void ignoreFail()
{ {
State current = _state.get(); State current = _state.get();
while (current.getType()==StateType.FAILED) while (current.getType() == StateType.FAILED)
{ {
if (updateState(current,__IDLE)) if (updateState(current, __IDLE))
return; return;
current = _state.get(); current = _state.get();
} }
@ -209,10 +209,11 @@ abstract public class WriteFlusher
private static class FailedState extends State private static class FailedState extends State
{ {
private final Throwable _cause; private final Throwable _cause;
private FailedState(Throwable cause) private FailedState(Throwable cause)
{ {
super(StateType.FAILED); super(StateType.FAILED);
_cause=cause; _cause = cause;
} }
public Throwable getCause() public Throwable getCause()
@ -257,7 +258,7 @@ abstract public class WriteFlusher
protected boolean fail(Throwable cause) protected boolean fail(Throwable cause)
{ {
if (_callback!=null) if (_callback != null)
{ {
_callback.failed(cause); _callback.failed(cause);
return true; return true;
@ -267,7 +268,7 @@ abstract public class WriteFlusher
protected void complete() protected void complete()
{ {
if (_callback!=null) if (_callback != null)
_callback.succeeded(); _callback.succeeded();
} }
@ -286,8 +287,8 @@ abstract public class WriteFlusher
{ {
State s = _state.get(); State s = _state.get();
return (s instanceof PendingState) return (s instanceof PendingState)
?((PendingState)s).getCallbackInvocationType() ? ((PendingState)s).getCallbackInvocationType()
:Invocable.InvocationType.BLOCKING; : Invocable.InvocationType.BLOCKING;
} }
/** /**
@ -300,13 +301,13 @@ abstract public class WriteFlusher
* Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition
* fails it'll fail the callback. * fails it'll fail the callback.
* *
* If not all buffers can be written in one go it creates a new <code>PendingState</code> object to preserve the state * If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state
* and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}. * and then calls {@link #onIncompleteFlush()}. The remaining buffers will be written in {@link #completeWrite()}.
* *
* If all buffers have been written it calls callback.complete(). * If all buffers have been written it calls callback.complete().
* *
* @param callback the callback to call on either failed or complete * @param callback the callback to call on either failed or complete
* @param buffers the buffers to flush to the endpoint * @param buffers the buffers to flush to the endpoint
* @throws WritePendingException if unable to write due to prior pending write * @throws WritePendingException if unable to write due to prior pending write
*/ */
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException
@ -314,20 +315,20 @@ abstract public class WriteFlusher
if (DEBUG) if (DEBUG)
LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers)); LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
if (!updateState(__IDLE,__WRITING)) if (!updateState(__IDLE, __WRITING))
throw new WritePendingException(); throw new WritePendingException();
try try
{ {
buffers=flush(buffers); buffers = flush(buffers);
// if we are incomplete? // if we are incomplete?
if (buffers!=null) if (buffers != null)
{ {
if (DEBUG) if (DEBUG)
LOG.debug("flushed incomplete"); LOG.debug("flushed incomplete");
PendingState pending=new PendingState(buffers, callback); PendingState pending = new PendingState(buffers, callback);
if (updateState(__WRITING,pending)) if (updateState(__WRITING, pending))
onIncompleteFlush(); onIncompleteFlush();
else else
fail(pending); fail(pending);
@ -335,18 +336,18 @@ abstract public class WriteFlusher
} }
// If updateState didn't succeed, we don't care as our buffers have been written // If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__WRITING,__IDLE)) if (!updateState(__WRITING, __IDLE))
ignoreFail(); ignoreFail();
if (callback!=null) if (callback != null)
callback.succeeded(); callback.succeeded();
} }
catch (IOException e) catch (IOException e)
{ {
if (DEBUG) if (DEBUG)
LOG.debug("write exception", e); LOG.debug("write exception", e);
if (updateState(__WRITING,__IDLE)) if (updateState(__WRITING, __IDLE))
{ {
if (callback!=null) if (callback != null)
callback.failed(e); callback.failed(e);
} }
else else
@ -354,7 +355,6 @@ abstract public class WriteFlusher
} }
} }
/** /**
* Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress. * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress.
@ -370,27 +370,27 @@ abstract public class WriteFlusher
State previous = _state.get(); State previous = _state.get();
if (previous.getType()!=StateType.PENDING) if (previous.getType() != StateType.PENDING)
return; // failure already handled. return; // failure already handled.
PendingState pending = (PendingState)previous; PendingState pending = (PendingState)previous;
if (!updateState(pending,__COMPLETING)) if (!updateState(pending, __COMPLETING))
return; // failure already handled. return; // failure already handled.
try try
{ {
ByteBuffer[] buffers = pending.getBuffers(); ByteBuffer[] buffers = pending.getBuffers();
buffers=flush(buffers); buffers = flush(buffers);
// if we are incomplete? // if we are incomplete?
if (buffers!=null) if (buffers != null)
{ {
if (DEBUG) if (DEBUG)
LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers)); LOG.debug("flushed incomplete {}", BufferUtil.toDetailString(buffers));
if (buffers!=pending.getBuffers()) if (buffers != pending.getBuffers())
pending=new PendingState(buffers, pending._callback); pending = new PendingState(buffers, pending._callback);
if (updateState(__COMPLETING,pending)) if (updateState(__COMPLETING, pending))
onIncompleteFlush(); onIncompleteFlush();
else else
fail(pending); fail(pending);
@ -398,7 +398,7 @@ abstract public class WriteFlusher
} }
// If updateState didn't succeed, we don't care as our buffers have been written // If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__COMPLETING,__IDLE)) if (!updateState(__COMPLETING, __IDLE))
ignoreFail(); ignoreFail();
pending.complete(); pending.complete();
} }
@ -406,7 +406,7 @@ abstract public class WriteFlusher
{ {
if (DEBUG) if (DEBUG)
LOG.debug("completeWrite exception", e); LOG.debug("completeWrite exception", e);
if(updateState(__COMPLETING,__IDLE)) if (updateState(__COMPLETING, __IDLE))
pending.fail(e); pending.fail(e);
else else
fail(pending); fail(pending);
@ -422,59 +422,84 @@ abstract public class WriteFlusher
*/ */
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{ {
boolean progress=true; boolean progress = true;
while(progress && buffers!=null) while (progress && buffers != null)
{ {
int before=buffers.length==0?0:buffers[0].remaining(); long before = remaining(buffers);
boolean flushed=_endPoint.flush(buffers); boolean flushed = _endPoint.flush(buffers);
int r=buffers.length==0?0:buffers[0].remaining(); long after = remaining(buffers);
long written = before - after;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Flushed={} {}/{}+{} {}",flushed,before-r,before,buffers.length-1,this); LOG.debug("Flushed={} written={} remaining={} {}", flushed, written, after, this);
if (written > 0)
{
Connection connection = _endPoint.getConnection();
if (connection instanceof Listener)
((Listener)connection).onFlushed(written);
}
if (flushed) if (flushed)
return null; return null;
progress=before!=r; progress = written > 0;
int not_empty=0; int index = 0;
while(r==0) while (true)
{ {
if (++not_empty==buffers.length) if (index == buffers.length)
{ {
buffers=null; // All buffers consumed.
not_empty=0; buffers = null;
index = 0;
break; break;
} }
progress=true; else
r=buffers[not_empty].remaining(); {
int remaining = buffers[index].remaining();
if (remaining > 0)
break;
++index;
progress = true;
}
} }
if (index > 0)
if (not_empty>0) buffers = Arrays.copyOfRange(buffers, index, buffers.length);
buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("!fully flushed {}",this); LOG.debug("!fully flushed {}", this);
// If buffers is null, then flush has returned false but has consumed all the data! // If buffers is null, then flush has returned false but has consumed all the data!
// This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS // This is probably SSL being unable to flush the encrypted buffer, so return EMPTY_BUFFERS
// and that will keep this WriteFlusher pending. // and that will keep this WriteFlusher pending.
return buffers==null?EMPTY_BUFFERS:buffers; return buffers == null ? EMPTY_BUFFERS : buffers;
} }
/* ------------------------------------------------------------ */ private long remaining(ByteBuffer[] buffers)
/** Notify the flusher of a failure {
if (buffers == null)
return 0;
long result = 0;
for (ByteBuffer buffer : buffers)
result += buffer.remaining();
return result;
}
/**
* Notify the flusher of a failure
*
* @param cause The cause of the failure * @param cause The cause of the failure
* @return true if the flusher passed the failure to a {@link Callback} instance * @return true if the flusher passed the failure to a {@link Callback} instance
*/ */
public boolean onFail(Throwable cause) public boolean onFail(Throwable cause)
{ {
// Keep trying to handle the failure until we get to IDLE or FAILED state // Keep trying to handle the failure until we get to IDLE or FAILED state
while(true) while (true)
{ {
State current=_state.get(); State current = _state.get();
switch(current.getType()) switch (current.getType())
{ {
case IDLE: case IDLE:
case FAILED: case FAILED:
@ -487,7 +512,7 @@ abstract public class WriteFlusher
LOG.debug("failed: " + this, cause); LOG.debug("failed: " + this, cause);
PendingState pending = (PendingState)current; PendingState pending = (PendingState)current;
if (updateState(pending,__IDLE)) if (updateState(pending, __IDLE))
return pending.fail(cause); return pending.fail(cause);
break; break;
@ -495,7 +520,7 @@ abstract public class WriteFlusher
if (DEBUG) if (DEBUG)
LOG.debug("failed: " + this, cause); LOG.debug("failed: " + this, cause);
if (updateState(current,new FailedState(cause))) if (updateState(current, new FailedState(cause)))
return false; return false;
break; break;
} }
@ -512,29 +537,9 @@ abstract public class WriteFlusher
return _state.get().getType() == StateType.IDLE; return _state.get().getType() == StateType.IDLE;
} }
public boolean isInProgress()
{
switch(_state.get().getType())
{
case WRITING:
case PENDING:
case COMPLETING:
return true;
default:
return false;
}
}
@Override
public String toString()
{
State s = _state.get();
return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s,s instanceof PendingState?((PendingState)s).getCallback():null);
}
public String toStateString() public String toStateString()
{ {
switch(_state.get().getType()) switch (_state.get().getType())
{ {
case WRITING: case WRITING:
return "W"; return "W";
@ -550,4 +555,33 @@ abstract public class WriteFlusher
return "?"; return "?";
} }
} }
@Override
public String toString()
{
State s = _state.get();
return String.format("WriteFlusher@%x{%s}->%s", hashCode(), s, s instanceof PendingState ? ((PendingState)s).getCallback() : null);
}
/**
* <p>A listener of {@link WriteFlusher} events.</p>
*/
public interface Listener
{
/**
* <p>Invoked when a {@link WriteFlusher} flushed bytes in a non-blocking way,
* as part of a - possibly larger - write.</p>
* <p>This method may be invoked multiple times, for example when writing a large
* buffer: a first flush of bytes, then the connection became TCP congested, and
* a subsequent flush of bytes when the connection became writable again.</p>
* <p>This method is never invoked concurrently, but may be invoked by different
* threads, so implementations may not rely on thread-local variables.</p>
* <p>Implementations may throw an {@link IOException} to signal that the write
* should fail, for example if the implementation enforces a minimum data rate.</p>
*
* @param bytes the number of bytes flushed
* @throws IOException if the write should fail
*/
void onFlushed(long bytes) throws IOException;
}
} }

View File

@ -67,6 +67,7 @@ public class HttpConfiguration
private int _maxErrorDispatches = 10; private int _maxErrorDispatches = 10;
private boolean _useDirectByteBuffers = false; private boolean _useDirectByteBuffers = false;
private long _minRequestDataRate; private long _minRequestDataRate;
private long _minResponseDataRate;
private CookieCompliance _cookieCompliance = CookieCompliance.RFC6265; private CookieCompliance _cookieCompliance = CookieCompliance.RFC6265;
private boolean _notifyRemoteAsyncErrors = true; private boolean _notifyRemoteAsyncErrors = true;
@ -129,6 +130,7 @@ public class HttpConfiguration
_maxErrorDispatches=config._maxErrorDispatches; _maxErrorDispatches=config._maxErrorDispatches;
_useDirectByteBuffers=config._useDirectByteBuffers; _useDirectByteBuffers=config._useDirectByteBuffers;
_minRequestDataRate=config._minRequestDataRate; _minRequestDataRate=config._minRequestDataRate;
_minResponseDataRate=config._minResponseDataRate;
_cookieCompliance=config._cookieCompliance; _cookieCompliance=config._cookieCompliance;
_notifyRemoteAsyncErrors=config._notifyRemoteAsyncErrors; _notifyRemoteAsyncErrors=config._notifyRemoteAsyncErrors;
} }
@ -512,7 +514,29 @@ public class HttpConfiguration
{ {
_minRequestDataRate=bytesPerSecond; _minRequestDataRate=bytesPerSecond;
} }
/**
* @return The minimum response data rate in bytes per second; or &lt;=0 for no limit
*/
@ManagedAttribute("The minimum response content data rate in bytes per second")
public long getMinResponseDataRate()
{
return _minResponseDataRate;
}
/**
* <p>Sets an minimum response content data rate.</p>
* <p>The value is enforced only approximately - not precisely - due to the fact that
* for efficiency reasons buffer writes may be comprised of both response headers and
* response content.</p>
*
* @param bytesPerSecond The minimum response data rate in bytes per second; or &lt;=0 for no limit
*/
public void setMinResponseDataRate(long bytesPerSecond)
{
_minResponseDataRate = bytesPerSecond;
}
public CookieCompliance getCookieCompliance() public CookieCompliance getCookieCompliance()
{ {
return _cookieCompliance; return _cookieCompliance;

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
@ -49,7 +50,7 @@ import org.eclipse.jetty.util.log.Logger;
/** /**
* <p>A {@link Connection} that handles the HTTP protocol.</p> * <p>A {@link Connection} that handles the HTTP protocol.</p>
*/ */
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom, WriteFlusher.Listener
{ {
private static final Logger LOG = Log.getLogger(HttpConnection.class); private static final Logger LOG = Log.getLogger(HttpConnection.class);
public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString()); public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
@ -192,6 +193,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return null; return null;
} }
@Override
public void onFlushed(long bytes) throws IOException
{
// Unfortunately cannot distinguish between header and content
// bytes, and for content bytes whether they are chunked or not.
_channel.getResponse().getHttpOutput().onFlushed(bytes);
}
void releaseRequestBuffer() void releaseRequestBuffer()
{ {
if (_requestBuffer != null && !_requestBuffer.hasRemaining()) if (_requestBuffer != null && !_requestBuffer.hasRemaining())

View File

@ -282,7 +282,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{ {
long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1); long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period) / TimeUnit.SECONDS.toMillis(1);
if (_contentArrived < minimum_data) if (_contentArrived < minimum_data)
throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request data rate < %d B/s",minRequestDataRate)); throw new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,String.format("Request content data rate < %d B/s",minRequestDataRate));
} }
} }

View File

@ -24,6 +24,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException; import java.nio.channels.WritePendingException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher; import javax.servlet.RequestDispatcher;
@ -122,12 +123,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable
private final HttpChannel _channel; private final HttpChannel _channel;
private final SharedBlockingCallback _writeBlocker; private final SharedBlockingCallback _writeBlocker;
private Interceptor _interceptor; private Interceptor _interceptor;
/**
* Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written.
*/
private long _written; private long _written;
private long _flushed;
private long _firstByteTimeStamp = -1;
private ByteBuffer _aggregate; private ByteBuffer _aggregate;
private int _bufferSize; private int _bufferSize;
private int _commitSize; private int _commitSize;
@ -231,6 +229,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable
protected void write(ByteBuffer content, boolean complete, Callback callback) protected void write(ByteBuffer content, boolean complete, Callback callback)
{ {
if (_firstByteTimeStamp == -1)
{
long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate();
if (minDataRate > 0)
_firstByteTimeStamp = System.nanoTime();
else
_firstByteTimeStamp = Long.MAX_VALUE;
}
_interceptor.write(content, complete, callback); _interceptor.write(content, complete, callback);
} }
@ -908,6 +914,30 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_commitSize = size; _commitSize = size;
} }
/**
* <p>Invoked when bytes have been flushed to the network.</p>
* <p>The number of flushed bytes may be different from the bytes written
* by the application if an {@link Interceptor} changed them, for example
* by compressing them.</p>
*
* @param bytes the number of bytes flushed
* @throws IOException if the minimum data rate, when set, is not respected
* @see org.eclipse.jetty.io.WriteFlusher.Listener
*/
public void onFlushed(long bytes) throws IOException
{
if (_firstByteTimeStamp == -1 || _firstByteTimeStamp == Long.MAX_VALUE)
return;
long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate();
_flushed += bytes;
long elapsed = System.nanoTime() - _firstByteTimeStamp;
long minFlushed = minDataRate * TimeUnit.NANOSECONDS.toMillis(elapsed) / TimeUnit.SECONDS.toMillis(1);
if (LOG.isDebugEnabled())
LOG.debug("Flushed bytes min/actual {}/{}", minFlushed, _flushed);
if (_flushed < minFlushed)
throw new IOException(String.format("Response content data rate < %d B/s", minDataRate));
}
public void recycle() public void recycle()
{ {
_interceptor = _channel; _interceptor = _channel;
@ -920,6 +950,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable
_written = 0; _written = 0;
_writeListener = null; _writeListener = null;
_onError = null; _onError = null;
_firstByteTimeStamp = -1;
_flushed = 0;
reopen(); reopen();
} }

View File

@ -19,6 +19,8 @@
package org.eclipse.jetty.util.thread.strategy; package org.eclipse.jetty.util.thread.strategy;
import java.io.Closeable; import java.io.Closeable;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
@ -32,8 +34,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.eclipse.jetty.util.thread.ReservedThreadExecutor;
/** /**
@ -66,9 +66,8 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{ {
private static final Logger LOG = Log.getLogger(EatWhatYouKill.class); private static final Logger LOG = Log.getLogger(EatWhatYouKill.class);
private enum State { IDLE, PRODUCING, REPRODUCING } private enum State { IDLE, PENDING, PRODUCING, REPRODUCING }
private final Locker _locker = new Locker();
private final LongAdder _nonBlocking = new LongAdder(); private final LongAdder _nonBlocking = new LongAdder();
private final LongAdder _blocking = new LongAdder(); private final LongAdder _blocking = new LongAdder();
private final LongAdder _executed = new LongAdder(); private final LongAdder _executed = new LongAdder();
@ -94,19 +93,20 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
_producers = producers; _producers = producers;
addBean(_producer); addBean(_producer);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} created", this); LOG.debug("{} created", this);
} }
@Override @Override
public void dispatch() public void dispatch()
{ {
boolean execute = false; boolean execute = false;
try (Lock locked = _locker.lock()) synchronized(this)
{ {
switch(_state) switch(_state)
{ {
case IDLE: case IDLE:
execute = true; execute = true;
_state = State.PENDING;
break; break;
case PRODUCING: case PRODUCING:
@ -136,19 +136,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} produce", this); LOG.debug("{} produce", this);
boolean reproduce = true; if (tryProduce())
while(isRunning() && tryProduce(reproduce) && doProduce()) doProduce();
reproduce = false;
} }
public boolean tryProduce(boolean reproduce) private boolean tryProduce()
{ {
boolean producing = false; boolean producing = false;
try (Lock locked = _locker.lock()) synchronized(this)
{ {
switch (_state) switch (_state)
{ {
case IDLE: case IDLE:
case PENDING:
// Enter PRODUCING // Enter PRODUCING
_state = State.PRODUCING; _state = State.PRODUCING;
producing = true; producing = true;
@ -156,8 +156,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
case PRODUCING: case PRODUCING:
// Keep other Thread producing // Keep other Thread producing
if (reproduce) _state = State.REPRODUCING;
_state = State.REPRODUCING;
break; break;
default: default:
@ -167,7 +166,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
return producing; return producing;
} }
public boolean doProduce() private void doProduce()
{ {
boolean producing = true; boolean producing = true;
while (isRunning() && producing) while (isRunning() && producing)
@ -178,30 +177,28 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
{ {
task = _producer.produce(); task = _producer.produce();
} }
catch(Throwable e) catch (Throwable e)
{ {
LOG.warn(e); LOG.warn(e);
} }
if (LOG.isDebugEnabled())
LOG.debug("{} t={}/{}",this,task,Invocable.getInvocationType(task));
if (task==null) if (task==null)
{ {
try (Lock locked = _locker.lock()) synchronized(this)
{ {
// Could another one just have been queued with a produce call? // Could another task just have been queued with a produce call?
if (_state==State.REPRODUCING) switch (_state)
{ {
_state = State.PRODUCING; case PRODUCING:
} _state = State.IDLE;
else producing = false;
{ break;
if (LOG.isDebugEnabled()) case REPRODUCING:
LOG.debug("{} IDLE",toStringLocked()); _state = State.PRODUCING;
_state = State.IDLE; break;
producing = false; default:
} throw new IllegalStateException(toStringLocked());
}
} }
} }
else else
@ -209,21 +206,19 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
boolean consume; boolean consume;
if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING) if (Invocable.getInvocationType(task) == InvocationType.NON_BLOCKING)
{ {
// PRODUCE CONSUME (EWYK!) // PRODUCE CONSUME
if (LOG.isDebugEnabled())
LOG.debug("{} PC t={}", this, task);
consume = true; consume = true;
_nonBlocking.increment(); _nonBlocking.increment();
} }
else else
{ {
try (Lock locked = _locker.lock()) synchronized(this)
{ {
if (_producers.tryExecute(this)) if (_producers.tryExecute(this))
{ {
// EXECUTE PRODUCE CONSUME! // EXECUTE PRODUCE CONSUME!
// We have executed a new Producer, so we can EWYK consume // We have executed a new Producer, so we can EWYK consume
_state = State.IDLE; _state = State.PENDING;
producing = false; producing = false;
consume = true; consume = true;
_blocking.increment(); _blocking.increment();
@ -233,13 +228,13 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
// PRODUCE EXECUTE CONSUME! // PRODUCE EXECUTE CONSUME!
consume = false; consume = false;
_executed.increment(); _executed.increment();
} }
} }
if (LOG.isDebugEnabled())
LOG.debug("{} {} t={}", this, consume ? "EPC" : "PEC", task);
} }
if (LOG.isDebugEnabled())
LOG.debug("{} p={} c={} t={}/{}", this, producing, consume, task,Invocable.getInvocationType(task));
// Consume or execute task // Consume or execute task
try try
{ {
@ -250,7 +245,10 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
} }
catch (RejectedExecutionException e) catch (RejectedExecutionException e)
{ {
LOG.warn(e); if (isRunning())
LOG.warn(e);
else
LOG.ignore(e);
if (task instanceof Closeable) if (task instanceof Closeable)
{ {
try try
@ -269,8 +267,6 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
} }
} }
} }
return producing;
} }
@ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true) @ManagedAttribute(value = "number of non blocking tasks consumed", readonly = true)
@ -294,7 +290,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
@ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true)
public boolean isIdle() public boolean isIdle()
{ {
try (Lock locked = _locker.lock()) synchronized(this)
{ {
return _state==State.IDLE; return _state==State.IDLE;
} }
@ -310,7 +306,7 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
public String toString() public String toString()
{ {
try (Lock locked = _locker.lock()) synchronized(this)
{ {
return toStringLocked(); return toStringLocked();
} }
@ -339,5 +335,14 @@ public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrat
builder.append(_state); builder.append(_state);
builder.append('/'); builder.append('/');
builder.append(_producers); builder.append(_producers);
builder.append("[nb=");
builder.append(getNonBlockingTasksConsumed());
builder.append(",c=");
builder.append(getBlockingTasksConsumed());
builder.append(",e=");
builder.append(getBlockingTasksExecuted());
builder.append("]");
builder.append("@");
builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
} }
} }

View File

@ -27,11 +27,16 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
public class EmptyServerHandler extends AbstractHandler public class EmptyServerHandler extends AbstractHandler.ErrorDispatchHandler
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException protected void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
jettyRequest.setHandled(true);
service(target, jettyRequest, request, response);
}
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
baseRequest.setHandled(true);
} }
} }

View File

@ -37,7 +37,6 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.client.util.BytesContentProvider;
@ -497,9 +496,8 @@ public class HttpClientTest extends AbstractTest
start(new EmptyServerHandler() start(new EmptyServerHandler()
{ {
@Override @Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{ {
super.handle(target, baseRequest, request, response);
response.getWriter().write("Jetty"); response.getWriter().write("Jetty");
} }
}); });

View File

@ -43,6 +43,8 @@ import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.DeferredContentProvider; import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
@ -50,7 +52,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
public class ServerTimeoutsTest extends AbstractTest public class ServerTimeoutsTest extends AbstractTest
@ -736,6 +740,76 @@ public class ServerTimeoutsTest extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testBlockingWriteWithMinimumDataRateBelowLimit() throws Exception
{
// This test needs a large write to stall the server, and a slow reading client.
// In HTTP/1.1, when using the loopback interface, the buffers are so large that
// it would require a very large write (32 MiB) and a lot of time for this test
// to pass. On the first writes, the server fills in the large buffers with a lot
// of bytes (about 4 MiB), and so it would take a lot of time for the client to
// read those bytes and eventually produce a write rate that will make the server
// fail; and the write should be large enough to _not_ complete before the rate
// is below the minimum.
// In HTTP/2, we force the flow control window to be small, so that the server
// stalls almost immediately without having written many bytes, so that the test
// completes quickly.
Assume.assumeThat(transport, Matchers.isOneOf(Transport.H2, Transport.H2C));
int bytesPerSecond = 16 * 1024;
httpConfig.setMinResponseDataRate(bytesPerSecond);
CountDownLatch serverLatch = new CountDownLatch(1);
start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
ServletOutputStream output = response.getOutputStream();
output.write(new byte[8 * 1024 * 1024]);
}
catch (IOException x)
{
serverLatch.countDown();
}
}
});
((HttpClientTransportOverHTTP2)client.getTransport()).getHTTP2Client().setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
// Setup the client to read slower than the min data rate.
BlockingQueue<Object> objects = new LinkedBlockingQueue<>();
CountDownLatch clientLatch = new CountDownLatch(1);
client.newRequest(newURI())
.onResponseContentAsync((response, content, callback) ->
{
objects.offer(content.remaining());
objects.offer(callback);
})
.send(result ->
{
objects.offer(-1);
objects.offer(Callback.NOOP);
if (result.isFailed())
clientLatch.countDown();
});
long readRate = bytesPerSecond / 2;
while (true)
{
int bytes = (Integer)objects.poll(5, TimeUnit.SECONDS);
if (bytes < 0)
break;
long ms = bytes * 1000L / readRate;
Thread.sleep(ms);
Callback callback = (Callback)objects.poll();
callback.succeeded();
}
Assert.assertTrue(serverLatch.await(15, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(15, TimeUnit.SECONDS));
}
private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler private static class BlockingReadHandler extends AbstractHandler.ErrorDispatchHandler
{ {
private final CountDownLatch handlerLatch; private final CountDownLatch handlerLatch;

View File

@ -49,6 +49,15 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<java.util.logging.config.file>${project.build.testOutputDirectory}/logging.properties</java.util.logging.config.file>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
<dependencies> <dependencies>
@ -88,7 +97,6 @@
<artifactId>jetty-test-helper</artifactId> <artifactId>jetty-test-helper</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -20,19 +20,29 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractClusteredLastAccessTimeTest; import org.eclipse.jetty.server.session.AbstractClusteredLastAccessTimeTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
public class ClusteredLastAccessTimeTest public class ClusteredLastAccessTimeTest
extends AbstractClusteredLastAccessTimeTest extends AbstractClusteredLastAccessTimeTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
*/ */
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractClusteredOrphanedSessionTest; import org.eclipse.jetty.server.session.AbstractClusteredOrphanedSessionTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
/** /**
* ClusteredOrphanedSessionTest * ClusteredOrphanedSessionTest
@ -29,6 +30,7 @@ public class ClusteredOrphanedSessionTest
extends AbstractClusteredOrphanedSessionTest extends AbstractClusteredOrphanedSessionTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
@ -36,9 +38,14 @@ public class ClusteredOrphanedSessionTest
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractClusteredSessionMigrationTest; import org.eclipse.jetty.server.session.AbstractClusteredSessionMigrationTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
/** /**
* ClusteredSessionMigrationTest * ClusteredSessionMigrationTest
@ -28,6 +29,7 @@ import org.eclipse.jetty.server.session.SessionDataStoreFactory;
public class ClusteredSessionMigrationTest public class ClusteredSessionMigrationTest
extends AbstractClusteredSessionMigrationTest extends AbstractClusteredSessionMigrationTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
@ -35,7 +37,14 @@ public class ClusteredSessionMigrationTest
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractClusteredSessionScavengingTest; import org.eclipse.jetty.server.session.AbstractClusteredSessionScavengingTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
/** /**
* ClusteredSessionScavengingTest * ClusteredSessionScavengingTest
@ -29,13 +30,22 @@ public class ClusteredSessionScavengingTest
extends AbstractClusteredSessionScavengingTest extends AbstractClusteredSessionScavengingTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
*/ */
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest; import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
/** /**
* ModifyMaxInactiveIntervalTest * ModifyMaxInactiveIntervalTest
@ -29,14 +30,23 @@ public class ModifyMaxInactiveIntervalTest
extends AbstractModifyMaxInactiveIntervalTest extends AbstractModifyMaxInactiveIntervalTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
*/ */
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest; import org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -31,6 +32,8 @@ public class NonClusteredSessionScavengingTest
extends AbstractNonClusteredSessionScavengingTest extends AbstractNonClusteredSessionScavengingTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest#assertSession(java.lang.String, boolean) * @see org.eclipse.jetty.server.session.AbstractNonClusteredSessionScavengingTest#assertSession(java.lang.String, boolean)
*/ */
@ -63,7 +66,14 @@ public class NonClusteredSessionScavengingTest
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -21,20 +21,30 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractSessionExpiryTest; import org.eclipse.jetty.server.session.AbstractSessionExpiryTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
public class SessionExpiryTest public class SessionExpiryTest
extends AbstractSessionExpiryTest extends AbstractSessionExpiryTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
*/ */
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.hazelcast.session;
import org.eclipse.jetty.server.session.AbstractSessionInvalidateCreateScavengeTest; import org.eclipse.jetty.server.session.AbstractSessionInvalidateCreateScavengeTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
/** /**
* SessionInvalidateCreateScavengeTest * SessionInvalidateCreateScavengeTest
@ -29,13 +30,22 @@ public class SessionInvalidateCreateScavengeTest
extends AbstractSessionInvalidateCreateScavengeTest extends AbstractSessionInvalidateCreateScavengeTest
{ {
HazelcastSessionDataStoreFactory factory;
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
*/ */
@Override @Override
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); factory = new HazelcastSessionDataStoreFactory();
factory.setMapName( Long.toString( System.currentTimeMillis() ) );
return factory; return factory;
} }
@After
public void shutdown()
{
factory.getHazelcastInstance().getMap( factory.getMapName() ).clear();
}
} }

View File

@ -32,7 +32,7 @@ public class ClientLastAccessTimeTest
extends AbstractClusteredLastAccessTimeTest extends AbstractClusteredLastAccessTimeTest
{ {
private static final String MAP_NAME = "jetty_foo_session"; private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;

View File

@ -19,9 +19,15 @@
package org.eclipse.jetty.hazelcast.session.client; package org.eclipse.jetty.hazelcast.session.client;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.eclipse.jetty.hazelcast.session.HazelcastSessionDataStoreFactory; import org.eclipse.jetty.hazelcast.session.HazelcastSessionDataStoreFactory;
import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest; import org.eclipse.jetty.server.session.AbstractModifyMaxInactiveIntervalTest;
import org.eclipse.jetty.server.session.SessionDataStoreFactory; import org.eclipse.jetty.server.session.SessionDataStoreFactory;
import org.junit.After;
import org.junit.Before;
/** /**
* ModifyMaxInactiveIntervalTest * ModifyMaxInactiveIntervalTest
@ -30,6 +36,27 @@ public class ClientModifyMaxInactiveIntervalTest
extends AbstractModifyMaxInactiveIntervalTest extends AbstractModifyMaxInactiveIntervalTest
{ {
private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance;
@Before
public void startHazelcast()
throws Exception
{
Config config = new Config().addMapConfig( new MapConfig().setName( MAP_NAME ) ) //
.setInstanceName( "beer" );
// start Hazelcast instance
hazelcastInstance = Hazelcast.getOrCreateHazelcastInstance( config );
}
@After
public void stopHazelcast()
throws Exception
{
hazelcastInstance.shutdown();
}
/** /**
* @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory() * @see org.eclipse.jetty.server.session.AbstractTestBase#createSessionDataStoreFactory()
*/ */
@ -37,6 +64,8 @@ public class ClientModifyMaxInactiveIntervalTest
public SessionDataStoreFactory createSessionDataStoreFactory() public SessionDataStoreFactory createSessionDataStoreFactory()
{ {
HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory(); HazelcastSessionDataStoreFactory factory = new HazelcastSessionDataStoreFactory();
factory.setOnlyClient( true );
factory.setMapName( MAP_NAME );
return factory; return factory;
} }

View File

@ -60,7 +60,7 @@ public class ClientNonClusteredSessionScavengingTest
fail( e.getMessage() ); fail( e.getMessage() );
} }
} }
private static final String MAP_NAME = "jetty_foo_session"; private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;

View File

@ -33,7 +33,7 @@ public class ClientOrphanedSessionTest
extends AbstractClusteredOrphanedSessionTest extends AbstractClusteredOrphanedSessionTest
{ {
private static final String MAP_NAME = "jetty_foo_session"; private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;

View File

@ -33,7 +33,7 @@ public class ClientSessionExpiryTest
extends AbstractSessionExpiryTest extends AbstractSessionExpiryTest
{ {
private static final String MAP_NAME = "jetty_foo_session"; private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;

View File

@ -32,7 +32,7 @@ import org.junit.Before;
public class ClientSessionInvalidateCreateScavengeTest public class ClientSessionInvalidateCreateScavengeTest
extends AbstractSessionInvalidateCreateScavengeTest extends AbstractSessionInvalidateCreateScavengeTest
{ {
private static final String MAP_NAME = "jetty_foo_session"; private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;

View File

@ -35,7 +35,7 @@ import org.junit.Before;
public class ClientSessionMigrationTest public class ClientSessionMigrationTest
extends AbstractClusteredSessionMigrationTest extends AbstractClusteredSessionMigrationTest
{ {
private static final String MAP_NAME = "jetty_foo_session"; private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;

View File

@ -33,7 +33,7 @@ public class ClientSessionScavengingTest
extends AbstractClusteredSessionScavengingTest extends AbstractClusteredSessionScavengingTest
{ {
private static final String MAP_NAME = "jetty_foo_session"; private static final String MAP_NAME = Long.toString( System.currentTimeMillis() );
private HazelcastInstance hazelcastInstance; private HazelcastInstance hazelcastInstance;

View File

@ -0,0 +1 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog

View File

@ -0,0 +1,3 @@
handlers=java.util.logging.ConsoleHandler
.level=INFO
com.hazelcast.level=SEVERE