Merge branch 'master' into jetty-9.4.x-Feature

This commit is contained in:
Greg Wilkins 2016-03-02 15:21:20 +01:00
commit d1504055de
51 changed files with 1512 additions and 465 deletions

View File

@ -1,5 +1,48 @@
jetty-9.4.0-SNAPSHOT
jetty-9.3.8.RC0 - 25 February 2016
+ 81 Exception not always thrown in Jetty to application when upload part is
too big
+ 82 Request.getPart() that results in Exception still allows other parts to
be fetched
+ 251 Removing SSLEngine.beginHandshake() calls
+ 285 PathContentProvider - Use of Direct buffers without pooling
+ 298 qtp threads spin-locked in MBeanContainer.beanAdded
+ 342 Reintroducing Response parameter to logExtended
+ 344 init script does not properly display status of a non running service
+ 346 HttpParser RFC2616 Compliance mode
+ 347 Avoid sending request using a connection that is idle timing out
+ 352 Integrate session idling for MongoSessionManager
+ 354 Spin loop in case of exception thrown during accept()
+ 355 Improve close behaviour
+ 478918 Change javax.servlet.error,forward,include literals to
RequestDispatcher constants
+ 484446 InputStreamResponseListener's InputStream uses default read (3) and
blocks early on never-ending response.
+ 485306 HttpParser (HttpURI) mistaking basic auth password as a port number
+ 485469 permessage-deflate extension causes protocol error in Firefox/Chrome
+ 486394 Restore MultiPartFilter behavior with regards to temp file access
+ 486497 NPE in MappedLoginService
+ 486511 Server.getURI() returns wrong scheme on SSL/HTTPS
+ 486530 Handler added to WebAppContext prevents ServletContext initialization
+ 486589 HttpRequest has a wrong HTTP Version in HTTP/2.
+ 486604 Add debug logging of ErrorPageErrorHandler logic
+ 486674 Quickstart path attribute normalization should be based on longest
path match
+ 486829 Cancel stream error after a failed request with the HTTP/2.0 client.
+ 486877 Google Chrome flagging 'obsolete cipher suite' in Jetty and will soon
issue broken padlock
+ 486930 Selector does not correctly handle rejected execution exception
+ 487158 Switched SCM URIs to github
+ 487197 Deflater/Inflater memory leak with WebSocket permessage-deflate
extension
+ 487198 ContextScopeListener should be called on context start and stop
+ 487277 Introduce http-forwarded module for X-Forwarded support
+ 487354 Aborted request or response does not send RST_STREAM frame.
+ 487511 Jetty HTTP won't work on turkish systems.
+ 487714 Avoid NPE in close race for async write
+ 487750 HTTP/2 push must not be recursive.
jetty-9.2.15.v20160210 - 10 February 2016
+ 482042 New API, Allow customization of ServletHandler path mapping
+ 482243 Fixed GzipHandler for Include.

View File

@ -313,6 +313,9 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// Won't use this connection, release it back.
if (!connectionPool.release(connection))
connection.close();
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.BasicAuthentication;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.B64Code;
@ -157,4 +158,100 @@ public class HttpClientProxyTest extends AbstractHttpClientServerTest
Assert.assertEquals(status, response3.getStatus());
Assert.assertEquals(1, requests.get());
}
@Test
public void testAuthenticatedProxiedRequestWithRedirect() throws Exception
{
String user = "foo";
String password = "bar";
String credentials = B64Code.encode(user + ":" + password, StandardCharsets.ISO_8859_1);
String proxyHost = "localhost";
String serverHost = "server";
int serverPort = HttpScheme.HTTP.is(scheme) ? 80 : 443;
String realm = "test_realm";
int status = HttpStatus.NO_CONTENT_204;
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (target.startsWith("/proxy"))
{
String authorization = request.getHeader(HttpHeader.PROXY_AUTHORIZATION.asString());
if (authorization == null)
{
response.setStatus(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407);
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "Basic realm=\"" + realm + "\"");
}
else
{
String prefix = "Basic ";
if (authorization.startsWith(prefix))
{
String attempt = authorization.substring(prefix.length());
if (credentials.equals(attempt))
{
// Change also the host, to verify that proxy authentication works in this case too.
response.sendRedirect(scheme + "://127.0.0.1:" + serverPort + "/server");
}
}
}
}
else if (target.startsWith("/server"))
{
response.setStatus(status);
}
else
{
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
}
}
});
int proxyPort = connector.getLocalPort();
client.getProxyConfiguration().getProxies().add(new HttpProxy(proxyHost, proxyPort));
ContentResponse response1 = client.newRequest(serverHost, serverPort)
.scheme(scheme)
.path("/proxy")
.timeout(5, TimeUnit.SECONDS)
.send();
// No Authentication available => 407.
Assert.assertEquals(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407, response1.getStatus());
// Add authentication...
URI uri = URI.create(scheme + "://" + proxyHost + ":" + proxyPort);
client.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, user, password));
final AtomicInteger requests = new AtomicInteger();
client.getRequestListeners().add(new Request.Listener.Adapter()
{
@Override
public void onSuccess(Request request)
{
requests.incrementAndGet();
}
});
// ...and perform the request again => 407 + 302 + 204.
ContentResponse response2 = client.newRequest(serverHost, serverPort)
.scheme(scheme)
.path("/proxy")
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(status, response2.getStatus());
Assert.assertEquals(3, requests.get());
// Now the authentication result is cached => 204.
requests.set(0);
ContentResponse response3 = client.newRequest(serverHost, serverPort)
.scheme(scheme)
.path("/server")
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(status, response3.getStatus());
Assert.assertEquals(1, requests.get());
}
}

View File

@ -47,6 +47,33 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
super(sslContextFactory);
}
@Test
public void testAbortBeforeQueued() throws Exception
{
start(new EmptyServerHandler());
Exception failure = new Exception("oops");
try
{
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(5, TimeUnit.SECONDS);
request.abort(failure);
request.send();
Assert.fail();
}
catch (ExecutionException x)
{
Assert.assertSame(failure, x.getCause());
// Make sure the pool is in a sane state.
HttpDestination destination = (HttpDestination)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(1, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(1, connectionPool.getIdleConnections().size());
}
}
@Test
public void testAbortOnQueued() throws Exception
{

View File

@ -89,7 +89,7 @@ public class ClientGenerator extends Generator
beginRequestBuffer.putInt(0x00_08_00_00);
// Hardcode RESPONDER role and KEEP_ALIVE flag
beginRequestBuffer.putLong(0x00_01_01_00_00_00_00_00L);
beginRequestBuffer.flip();
BufferUtil.flipToFlush(beginRequestBuffer, 0);
int index = 0;
while (fieldsLength > 0)
@ -129,7 +129,7 @@ public class ClientGenerator extends Generator
}
buffer.putShort(4, (short)length);
buffer.flip();
BufferUtil.flipToFlush(buffer, 0);
}
@ -140,7 +140,7 @@ public class ClientGenerator extends Generator
// Generate the last FCGI_PARAMS frame
lastParamsBuffer.putInt(0x01_04_00_00 + request);
lastParamsBuffer.putInt(0x00_00_00_00);
lastParamsBuffer.flip();
BufferUtil.flipToFlush(lastParamsBuffer, 0);
return result;
}

View File

@ -58,7 +58,7 @@ public class Generator
int length = Math.min(MAX_CONTENT_LENGTH, contentLength);
buffer.putShort((short)length);
buffer.putShort((short)0);
buffer.flip();
BufferUtil.flipToFlush(buffer, 0);
if (contentLength == 0)
break;

View File

@ -95,7 +95,7 @@ public class ServerGenerator extends Generator
buffer.put(bytes.get(i)).put(COLON).put(bytes.get(i + 1)).put(EOL);
buffer.put(EOL);
buffer.flip();
BufferUtil.flipToFlush(buffer, 0);
return generateContent(request, buffer, true, false, callback, FCGI.FrameType.STDOUT);
}
@ -129,7 +129,7 @@ public class ServerGenerator extends Generator
endRequestBuffer.putInt(0x00_08_00_00);
endRequestBuffer.putInt(aborted ? 1 : 0);
endRequestBuffer.putInt(0);
endRequestBuffer.flip();
BufferUtil.flipToFlush(endRequestBuffer, 0);
return endRequestBuffer;
}
}

View File

@ -27,7 +27,7 @@
<Arg>
<Ref id="Server"/>
</Arg>
<Set name="workerName"><Property name="jetty.gcloudSession.workerName" default="node1"/></Set>
<Set name="workerName"><Property name="jetty.gcloudSession.workerName"><Default>node<Env name="GAE_MODULE_INSTANCE" default="0"/></Default></Property></Set>
<Set name="config"><Ref id="gconf"/></Set>
</New>
</Set>

View File

@ -52,7 +52,8 @@ https://github.com/GoogleCloudPlatform/gcloud-java
http://www.apache.org/licenses/LICENSE-2.0.html
[ini-template]
## Unique identifier for this node in the cluster
## Unique identifier to force the workername for this node in the cluster
## If not set, will default to the string "node" plus the Env variable $GAE_MODULE_INSTANCE
# jetty.gcloudSession.workerName=node1

View File

@ -88,6 +88,56 @@ public class HTTP2Test extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testRequestNoContentResponseEmptyContent() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, false), new Callback()
{
@Override
public void succeeded()
{
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), NOOP);
}
});
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
final CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Assert.assertFalse(frame.isEndStream());
Assert.assertEquals(stream.getId(), frame.getStreamId());
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(200, response.getStatus());
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
Assert.assertTrue(frame.isEndStream());
callback.succeeded();
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testRequestNoContentResponseContent() throws Exception
{

View File

@ -357,6 +357,7 @@ public class IdleTimeoutTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
dataLatch.countDown();
}

View File

@ -0,0 +1,231 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class InterleavingTest extends AbstractTest
{
@Test
public void testInterleaving() throws Exception
{
CountDownLatch serverStreamsLatch = new CountDownLatch(2);
List<Stream> serverStreams = new ArrayList<>();
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
serverStreams.add(stream);
serverStreamsLatch.countDown();
return null;
}
});
int maxFrameSize = Frame.DEFAULT_MAX_LENGTH + 1;
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_FRAME_SIZE, maxFrameSize);
return settings;
}
});
BlockingQueue<FrameBytesCallback> dataFrames = new LinkedBlockingDeque<>();
Stream.Listener streamListener = new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
ByteBuffer data = frame.getData();
byte[] bytes = new byte[data.remaining()];
data.get(bytes);
dataFrames.offer(new FrameBytesCallback(frame, bytes, callback));
}
};
HeadersFrame headersFrame1 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
session.newStream(headersFrame1, streamPromise1, streamListener);
streamPromise1.get(5, TimeUnit.SECONDS);
HeadersFrame headersFrame2 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
session.newStream(headersFrame2, streamPromise2, streamListener);
streamPromise2.get(5, TimeUnit.SECONDS);
Assert.assertTrue(serverStreamsLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(1000);
Stream serverStream1 = serverStreams.get(0);
Stream serverStream2 = serverStreams.get(1);
MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
serverStream1.headers(new HeadersFrame(serverStream1.getId(), response1, null, false), Callback.NOOP);
Random random = new Random();
byte[] content1 = new byte[2 * ((ISession)serverStream1.getSession()).updateSendWindow(0)];
random.nextBytes(content1);
byte[] content2 = new byte[2 * ((ISession)serverStream2.getSession()).updateSendWindow(0)];
random.nextBytes(content2);
MetaData.Response response2 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
serverStream2.headers(new HeadersFrame(serverStream2.getId(), response2, null, false), new Callback()
{
@Override
public void succeeded()
{
// Write data for both streams from within the callback so that they get queued together.
ByteBuffer buffer1 = ByteBuffer.wrap(content1);
serverStream1.data(new DataFrame(serverStream1.getId(), buffer1, true), NOOP);
ByteBuffer buffer2 = ByteBuffer.wrap(content2);
serverStream2.data(new DataFrame(serverStream2.getId(), buffer2, true), NOOP);
}
});
// The client reads with a buffer size that is different from the
// frame size and synthesizes DATA frames, so expect N frames for
// stream1 up to maxFrameSize of data, then M frames for stream2
// up to maxFrameSize of data, and so forth, interleaved.
Map<Integer, ByteArrayOutputStream> contents = new HashMap<>();
contents.put(serverStream1.getId(), new ByteArrayOutputStream());
contents.put(serverStream2.getId(), new ByteArrayOutputStream());
List<StreamLength> streamLengths = new ArrayList<>();
int finished = 0;
while (finished < 2)
{
FrameBytesCallback frameBytesCallback = dataFrames.poll(5, TimeUnit.SECONDS);
if (frameBytesCallback == null)
Assert.fail();
DataFrame dataFrame = frameBytesCallback.frame;
int streamId = dataFrame.getStreamId();
int length = dataFrame.remaining();
streamLengths.add(new StreamLength(streamId, length));
if (dataFrame.isEndStream())
++finished;
contents.get(streamId).write(frameBytesCallback.bytes);
frameBytesCallback.callback.succeeded();
}
// Verify that the content has been sent properly.
Assert.assertArrayEquals(content1, contents.get(serverStream1.getId()).toByteArray());
Assert.assertArrayEquals(content2, contents.get(serverStream2.getId()).toByteArray());
// Verify that the interleaving is correct.
Map<Integer, List<Integer>> groups = new HashMap<>();
groups.put(serverStream1.getId(), new ArrayList<>());
groups.put(serverStream2.getId(), new ArrayList<>());
int currentStream = 0;
int currentLength = 0;
for (StreamLength streamLength : streamLengths)
{
if (currentStream == 0)
currentStream = streamLength.stream;
if (currentStream != streamLength.stream)
{
groups.get(currentStream).add(currentLength);
currentStream = streamLength.stream;
currentLength = 0;
}
currentLength += streamLength.length;
}
groups.get(currentStream).add(currentLength);
Logger logger = Log.getLogger(getClass());
logger.debug("frame lengths = {}", streamLengths);
groups.forEach((stream, lengths) ->
{
logger.debug("stream {} interleaved lengths = {}", stream, lengths);
for (Integer length : lengths)
Assert.assertThat(length, Matchers.lessThanOrEqualTo(maxFrameSize));
});
}
private static class FrameBytesCallback
{
private final DataFrame frame;
private final byte[] bytes;
private final Callback callback;
private FrameBytesCallback(DataFrame frame, byte[] bytes, Callback callback)
{
this.frame = frame;
this.bytes = bytes;
this.callback = callback;
}
}
private static class StreamLength
{
private final int stream;
private final int length;
private StreamLength(int stream, int length)
{
this.stream = stream;
this.length = length;
}
@Override
public String toString()
{
return String.format("(%d,%d)", stream, length);
}
}
}

View File

@ -53,6 +53,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
@ -168,12 +169,12 @@ public class PrefaceTest extends AbstractTest
ByteBuffer buffer = byteBufferPool.acquire(1024, true);
while (true)
{
BufferUtil.clearToFill(buffer);
int read = socket.read(buffer);
buffer.flip();
BufferUtil.flipToFlush(buffer, 0);
if (read < 0)
break;
parser.parse(buffer);
buffer.clear();
}
Assert.assertEquals(2, settings.size());
@ -248,9 +249,9 @@ public class PrefaceTest extends AbstractTest
ByteBuffer buffer = byteBufferPool.acquire(1024, true);
http1: while (true)
{
buffer.clear();
BufferUtil.clearToFill(buffer);
int read = socket.read(buffer);
buffer.flip();
BufferUtil.flipToFlush(buffer, 0);
if (read < 0)
Assert.fail();
@ -314,9 +315,9 @@ public class PrefaceTest extends AbstractTest
if (responded.get())
break;
buffer.clear();
BufferUtil.clearToFill(buffer);
int read = socket.read(buffer);
buffer.flip();
BufferUtil.flipToFlush(buffer, 0);
if (read < 0)
Assert.fail();
}

View File

@ -101,6 +101,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
warmupLatch.countDown();
}
});
@ -188,6 +189,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
warmupLatch.countDown();
}
});
@ -273,6 +275,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
warmupLatch.countDown();
}
});
@ -298,6 +301,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
pushLatch.countDown();
}
};
@ -325,6 +329,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
secondaryResponseLatch.countDown();
}
@ -372,6 +377,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
warmupLatch.countDown();
}
});
@ -655,6 +661,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
warmupLatch.countDown();
}
@ -676,6 +683,7 @@ public class PushCacheFilterTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
primaryResponseLatch.countDown();
}

View File

@ -130,8 +130,14 @@ public class StreamCloseTest extends AbstractTest
public void onData(final Stream stream, DataFrame frame, final Callback callback)
{
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
// We must copy the data that we send asynchronously.
ByteBuffer data = frame.getData();
ByteBuffer copy = ByteBuffer.allocate(data.remaining());
copy.put(data).flip();
completable.thenRun(() ->
stream.data(frame, new Callback()
stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), new Callback()
{
@Override
public void succeeded()
@ -157,6 +163,7 @@ public class StreamCloseTest extends AbstractTest
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// The sent data callback may not be notified yet here.
callback.succeeded();
completeLatch.countDown();
}
});

View File

@ -70,6 +70,10 @@ public class StreamCountTest extends AbstractTest
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
}
else
{
callback.succeeded();
}
}
};
}
@ -144,6 +148,10 @@ public class StreamCountTest extends AbstractTest
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
}
else
{
callback.succeeded();
}
}
};
}

View File

@ -181,6 +181,7 @@ public class StreamResetTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
stream1DataLatch.countDown();
}
});
@ -196,6 +197,7 @@ public class StreamResetTest extends AbstractTest
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
stream2DataLatch.countDown();
}
});

View File

@ -22,9 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.eclipse.jetty.http2.frames.Frame;
@ -43,11 +41,11 @@ public class HTTP2Flusher extends IteratingCallback
private final Queue<WindowEntry> windows = new ArrayDeque<>();
private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
private final Map<IStream, Integer> streams = new HashMap<>();
private final List<Entry> resets = new ArrayList<>();
private final Queue<Entry> entries = new ArrayDeque<>();
private final List<Entry> actives = new ArrayList<>();
private final HTTP2Session session;
private final ByteBufferPool.Lease lease;
private Entry stalled;
private boolean terminated;
public HTTP2Flusher(HTTP2Session session)
@ -106,17 +104,6 @@ public class HTTP2Flusher extends IteratingCallback
return !closed;
}
private Entry remove(int index)
{
synchronized (this)
{
if (index == 0)
return frames.pollUnsafe();
else
return frames.remove(index);
}
}
public int getQueueSize()
{
synchronized (this)
@ -136,112 +123,75 @@ public class HTTP2Flusher extends IteratingCallback
if (terminated)
throw new ClosedChannelException();
// First thing, update the window sizes, so we can
// reason about the frames to remove from the queue.
while (!windows.isEmpty())
{
WindowEntry entry = windows.poll();
entry.perform();
}
// Now the window sizes cannot change.
// Window updates that happen concurrently will
// be queued and processed on the next iteration.
int sessionWindow = session.getSendWindow();
int index = 0;
int size = frames.size();
while (index < size)
if (!frames.isEmpty())
{
Entry entry = frames.get(index);
IStream stream = entry.stream;
// If the stream has been reset, don't send the frame.
if (stream != null && stream.isReset() && !entry.isProtocol())
for (Entry entry : frames)
{
remove(index);
--size;
resets.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered for reset {}", entry);
continue;
entries.offer(entry);
actives.add(entry);
}
// Check if the frame fits in the flow control windows.
int remaining = entry.dataRemaining();
if (remaining > 0)
{
if (sessionWindow <= 0)
{
++index;
// There may be *non* flow controlled frames to send.
continue;
}
if (stream != null)
{
// The stream may have a smaller window than the session.
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
streamWindow = stream.updateSendWindow(0);
streams.put(stream, streamWindow);
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
++index;
// There may be *non* flow controlled frames to send.
continue;
}
}
// The frame fits both flow control windows, reduce them.
sessionWindow -= remaining;
if (stream != null)
streams.put(stream, streams.get(stream) - remaining);
}
// The frame will be written, remove it from the queue.
remove(index);
--size;
actives.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered for write {}", entry);
frames.clear();
}
streams.clear();
}
// Perform resets outside the sync block.
for (int i = 0; i < resets.size(); ++i)
{
Entry entry = resets.get(i);
entry.reset();
}
resets.clear();
if (actives.isEmpty())
if (entries.isEmpty())
{
if (LOG.isDebugEnabled())
LOG.debug("Flushed {}", session);
return Action.IDLE;
}
for (int i = 0; i < actives.size(); ++i)
while (!entries.isEmpty())
{
Entry entry = actives.get(i);
Throwable failure = entry.generate(lease);
if (failure != null)
Entry entry = entries.poll();
if (LOG.isDebugEnabled())
LOG.debug("Processing {}", entry);
// If the stream has been reset, don't send the frame.
if (entry.reset())
{
if (LOG.isDebugEnabled())
LOG.debug("Resetting {}", entry);
continue;
}
try
{
if (entry.generate(lease))
{
if (entry.dataRemaining() > 0)
entries.offer(entry);
}
else
{
if (stalled == null)
stalled = entry;
}
}
catch (Throwable failure)
{
// Failure to generate the entry is catastrophic.
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + entry.frame, failure);
failed(failure);
return Action.SUCCEEDED;
}
}
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
if (byteBuffers.isEmpty())
{
complete();
return Action.IDLE;
}
if (LOG.isDebugEnabled())
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
@ -251,17 +201,45 @@ public class HTTP2Flusher extends IteratingCallback
@Override
public void succeeded()
{
lease.recycle();
if (LOG.isDebugEnabled())
LOG.debug("Written {} frames for {}", actives.size(), actives);
actives.forEach(Entry::succeeded);
actives.clear();
complete();
super.succeeded();
}
private void complete()
{
lease.recycle();
actives.forEach(Entry::complete);
if (stalled != null)
{
// We have written part of the frame, but there is more to write.
// The API will not allow to send two data frames for the same
// stream so we append the unfinished frame at the end to allow
// better interleaving with other streams.
int index = actives.indexOf(stalled);
for (int i = index; i < actives.size(); ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
append(entry);
}
for (int i = 0; i < index; ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
append(entry);
}
stalled = null;
}
actives.clear();
}
@Override
protected void onCompleteSuccess()
{
@ -317,6 +295,7 @@ public class HTTP2Flusher extends IteratingCallback
protected final Frame frame;
protected final IStream stream;
protected final Callback callback;
private boolean reset;
protected Entry(Frame frame, IStream stream, Callback callback)
{
@ -330,14 +309,14 @@ public class HTTP2Flusher extends IteratingCallback
return 0;
}
public Throwable generate(ByteBufferPool.Lease lease)
{
return null;
}
protected abstract boolean generate(ByteBufferPool.Lease lease);
public void reset()
private void complete()
{
failed(new EofException("reset"));
if (reset)
failed(new EofException("reset"));
else
succeeded();
}
@Override
@ -351,7 +330,12 @@ public class HTTP2Flusher extends IteratingCallback
callback.failed(x);
}
public boolean isProtocol()
private boolean reset()
{
return this.reset = stream != null && stream.isReset() && !isProtocol();
}
private boolean isProtocol()
{
switch (frame.getType())
{

View File

@ -1048,22 +1048,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
super(frame, stream, callback);
}
public Throwable generate(ByteBufferPool.Lease lease)
protected boolean generate(ByteBufferPool.Lease lease)
{
try
{
generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
prepare();
return null;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + frame, x);
return x;
}
generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
prepare();
return true;
}
/**
@ -1154,71 +1145,58 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class DataEntry extends HTTP2Flusher.Entry
{
private int length;
private int remaining;
private int generated;
private DataEntry(DataFrame frame, IStream stream, Callback callback)
{
super(frame, stream, callback);
}
@Override
public int dataRemaining()
{
// We don't do any padding, so the flow control length is
// always the data remaining. This simplifies the handling
// of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case
// we would have to count the padding only once.
return ((DataFrame)frame).remaining();
remaining = frame.remaining();
}
public Throwable generate(ByteBufferPool.Lease lease)
@Override
public int dataRemaining()
{
try
{
int flowControlLength = dataRemaining();
return remaining;
}
int sessionSendWindow = getSendWindow();
if (sessionSendWindow < 0)
throw new IllegalStateException();
protected boolean generate(ByteBufferPool.Lease lease)
{
int toWrite = dataRemaining();
int streamSendWindow = stream.updateSendWindow(0);
if (streamSendWindow < 0)
throw new IllegalStateException();
int sessionSendWindow = getSendWindow();
int streamSendWindow = stream.updateSendWindow(0);
int window = Math.min(streamSendWindow, sessionSendWindow);
if (window <= 0 && toWrite > 0)
return false;
int window = Math.min(streamSendWindow, sessionSendWindow);
int length = Math.min(toWrite, window);
int length = this.length = Math.min(flowControlLength, window);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window={}/{}", frame, length, window);
int generated = generator.data(lease, (DataFrame)frame, length);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, generated, window, toWrite);
generator.data(lease, (DataFrame)frame, length);
flowControl.onDataSending(stream, length);
return null;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + frame, x);
return x;
}
this.generated += generated;
this.remaining -= generated;
flowControl.onDataSending(stream, generated);
return true;
}
@Override
public void succeeded()
{
flowControl.onDataSent(stream, length);
flowControl.onDataSent(stream, generated);
generated = 0;
// Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame;
if (dataFrame.remaining() > 0)
{
// We have written part of the frame, but there is more to write.
// The API will not allow to send two data frames for the same
// stream so we append the unfinished frame at the end to allow
// better interleaving with other streams.
flusher.append(this);
}
else
if (dataRemaining() == 0)
{
// Only now we can update the close state
// and eventually remove the stream.

View File

@ -36,43 +36,34 @@ public class DataGenerator
this.headerGenerator = headerGenerator;
}
public void generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
public int generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
{
generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength);
return generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength);
}
public void generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength)
public int generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength)
{
if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId);
int dataLength = data.remaining();
int maxFrameSize = headerGenerator.getMaxFrameSize();
if (dataLength <= maxLength && dataLength <= maxFrameSize)
int length = Math.min(dataLength, Math.min(maxFrameSize, maxLength));
if (length == dataLength)
{
// Single frame.
generateFrame(lease, streamId, data, last);
return;
}
// Other cases, we need to slice the original buffer into multiple frames.
int length = Math.min(maxLength, dataLength);
int frames = length / maxFrameSize;
if (frames * maxFrameSize != length)
++frames;
int begin = data.position();
int end = data.limit();
for (int i = 1; i <= frames; ++i)
else
{
int limit = begin + Math.min(maxFrameSize * i, length);
data.limit(limit);
int limit = data.limit();
int newLimit = data.position() + length;
data.limit(newLimit);
ByteBuffer slice = data.slice();
data.position(limit);
generateFrame(lease, streamId, slice, i == frames && last && limit == end);
data.position(newLimit);
data.limit(limit);
generateFrame(lease, streamId, slice, false);
}
data.limit(end);
return length;
}
private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last)
@ -88,6 +79,7 @@ public class DataGenerator
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
lease.append(data, false);
if (data.remaining() > 0)
lease.append(data, false);
}
}

View File

@ -80,8 +80,8 @@ public class Generator
generators[frame.getType().getType()].generate(lease, frame);
}
public void data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
public int data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
{
dataGenerator.generate(lease, frame, maxLength);
return dataGenerator.generate(lease, frame, maxLength);
}
}

View File

@ -103,7 +103,14 @@ public class DataGenerateParseTest
for (int i = 0; i < 2; ++i)
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generateData(lease, 13, data.slice(), true, data.remaining());
ByteBuffer slice = data.slice();
int generated = 0;
while (true)
{
generated += generator.generateData(lease, 13, slice, true, slice.remaining());
if (generated == data.remaining())
break;
}
frames.clear();
for (ByteBuffer buffer : lease.getByteBuffers())
@ -135,7 +142,14 @@ public class DataGenerateParseTest
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
ByteBuffer data = ByteBuffer.wrap(largeContent);
generator.generateData(lease, 13, data.slice(), true, data.remaining());
ByteBuffer slice = data.slice();
int generated = 0;
while (true)
{
generated += generator.generateData(lease, 13, slice, true, slice.remaining());
if (generated == data.remaining())
break;
}
frames.clear();
for (ByteBuffer buffer : lease.getByteBuffers())

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Locale;
import org.eclipse.jetty.client.HttpChannel;
@ -34,6 +35,8 @@ import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listener
@ -95,7 +98,42 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
return;
}
if (responseContent(exchange, frame.getData(), callback))
// We must copy the data since we do not know when the
// application will consume the bytes and the parsing
// will continue as soon as this method returns, eventually
// leading to reusing the underlying buffer for more reads.
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
ByteBuffer original = frame.getData();
int length = original.remaining();
final ByteBuffer copy = byteBufferPool.acquire(length, original.isDirect());
BufferUtil.clearToFill(copy);
copy.put(original);
BufferUtil.flipToFlush(copy, 0);
Callback delegate = new Callback()
{
@Override
public boolean isNonBlocking()
{
return callback.isNonBlocking();
}
@Override
public void succeeded()
{
byteBufferPool.release(copy);
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
byteBufferPool.release(copy);
callback.failed(x);
}
};
if (responseContent(exchange, copy, delegate))
{
if (frame.isEndStream())
responseSuccess(exchange);

View File

@ -164,15 +164,17 @@ public class HttpChannelOverHTTP2 extends HttpChannel
public Runnable requestContent(DataFrame frame, final Callback callback)
{
// We must copy the data since we do not know when the
// application will consume its bytes (we queue them by
// calling onContent()), and we cannot stop the parsing
// since there may be frames for other streams.
// application will consume the bytes (we queue them by
// calling onContent()), and the parsing will continue
// as soon as this method returns, eventually leading
// to reusing the underlying buffer for more reads.
final ByteBufferPool byteBufferPool = getByteBufferPool();
ByteBuffer original = frame.getData();
int length = original.remaining();
final ByteBuffer copy = byteBufferPool.acquire(length, original.isDirect());
BufferUtil.clearToFill(copy);
copy.put(original).flip();
copy.put(original);
BufferUtil.flipToFlush(copy, 0);
boolean handle = onContent(new HttpInput.Content(copy)
{

View File

@ -106,7 +106,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} committed", stream.getId());
LOG.debug("HTTP2 Response #{}/{} committed", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
send(content, lastContent, callback);
}
@ -114,7 +114,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
LOG.debug("HTTP2 Response #" + stream.getId() + "/" + Integer.toHexString(stream.getSession().hashCode()) + " failed to commit", x);
callback.failed(x);
}
});
@ -159,7 +159,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
}
if (LOG.isDebugEnabled())
LOG.debug("HTTP/2 Push {}",request);
LOG.debug("HTTP/2 Push {}", request);
stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
{
@ -182,8 +182,9 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Response #{}:{}{} {}{}{}",
stream.getId(), System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
System.lineSeparator(), info.getFields());
}
@ -195,8 +196,9 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP2 Response #{}: {} content bytes{}",
stream.getId(), content.remaining(), lastContent ? " (last chunk)" : "");
LOG.debug("HTTP2 Response #{}/{}: {} content bytes{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
content.remaining(), lastContent ? " (last chunk)" : "");
}
DataFrame frame = new DataFrame(stream.getId(), content, lastContent);
stream.data(frame, callback);
@ -222,7 +224,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
IStream stream = this.stream;
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} aborted", stream == null ? -1 : stream.getId());
LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(),
stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode()));
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
}

View File

@ -97,6 +97,9 @@ public class InfinispanSessionDataStore extends AbstractSessionDataStore
try
{
if (LOG.isDebugEnabled())
LOG.debug("Loading session {} from infinispan", id);
SessionData sd = (SessionData)_cache.get(getCacheKey(id, _context));
reference.set(sd);
}
@ -201,9 +204,24 @@ public class InfinispanSessionDataStore extends AbstractSessionDataStore
@Override
public boolean isPassivating()
{
return true;
//TODO run in the _context to ensure classloader is set
try
{
Class<?> remoteClass = Thread.currentThread().getContextClassLoader().loadClass("org.infinispan.client.hotrod.RemoteCache");
if (_cache.getClass().isAssignableFrom(remoteClass))
{
return true;
}
return false;
}
catch (ClassNotFoundException e)
{
return false;
}
}
public void setInfinispanIdleTimeoutSec (int sec)
{
_infinispanIdleTimeoutSec = sec;

View File

@ -293,24 +293,7 @@ public class InfinispanSessionIdManager extends AbstractSessionIdManager
return delete (id);
}
/* ------------------------------------------------------------ */
/**
* Remove an id from use by telling all contexts to remove a session with this id.
*
* @see org.eclipse.jetty.server.SessionIdManager#expireAll(java.lang.String)
*/
@Override
public void expireAll(String id)
{
LOG.debug("Expiring "+id);
//take the id out of the list of known sessionids for this node - TODO consider if we didn't remove it from this node
//it is because infinispan probably already timed it out. So, we only want to expire it from memory and NOT load it if present
removeId(id);
//tell all contexts that may have a session object with this id to
//get rid of them
for (SessionManager manager:getSessionManagers())
{
manager.invalidate(id);
}
}
}

View File

@ -127,8 +127,8 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
@Override
public void run()
{
getFillInterest().fillable();
getWriteFlusher().completeWrite();
getFillInterest().fillable();
}
};

View File

@ -1257,12 +1257,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable
}
// Read from stream until buffer full or EOF
_buffer.clear();
BufferUtil.clearToFill(_buffer);
while (_buffer.hasRemaining() && !_eof)
_eof = (_in.read(_buffer)) < 0;
// write what we have
_buffer.flip();
BufferUtil.flipToFlush(_buffer, 0);
write(_buffer,_eof,this);
return Action.SCHEDULED;

View File

@ -168,7 +168,7 @@ public class Request implements HttpServletRequest
private String _pathInfo;
private boolean _secure;
private boolean _asyncSupported = true;
private String _asyncNotSupportedSource = null;
private boolean _newContext;
private boolean _cookiesExtracted = false;
private boolean _handled = false;
@ -1668,7 +1668,7 @@ public class Request implements HttpServletRequest
@Override
public boolean isAsyncSupported()
{
return _asyncSupported;
return _asyncNotSupportedSource==null;
}
/* ------------------------------------------------------------ */
@ -1844,7 +1844,7 @@ public class Request implements HttpServletRequest
if (_async!=null)
_async.reset();
_async=null;
_asyncSupported = true;
_asyncNotSupportedSource = null;
_handled = false;
if (_attributes != null)
_attributes.clearAttributes();
@ -1914,9 +1914,9 @@ public class Request implements HttpServletRequest
}
/* ------------------------------------------------------------ */
public void setAsyncSupported(boolean supported)
public void setAsyncSupported(boolean supported,String source)
{
_asyncSupported = supported;
_asyncNotSupportedSource = supported?null:(source==null?"unknown":source);
}
/* ------------------------------------------------------------ */
@ -2236,8 +2236,8 @@ public class Request implements HttpServletRequest
@Override
public AsyncContext startAsync() throws IllegalStateException
{
if (!_asyncSupported)
throw new IllegalStateException("!asyncSupported");
if (_asyncNotSupportedSource!=null)
throw new IllegalStateException("!asyncSupported: "+_asyncNotSupportedSource);
HttpChannelState state = getHttpChannelState();
if (_async==null)
_async=new AsyncContextState(state);
@ -2250,8 +2250,8 @@ public class Request implements HttpServletRequest
@Override
public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) throws IllegalStateException
{
if (!_asyncSupported)
throw new IllegalStateException("!asyncSupported");
if (_asyncNotSupportedSource!=null)
throw new IllegalStateException("!asyncSupported: "+_asyncNotSupportedSource);
HttpChannelState state = getHttpChannelState();
if (_async==null)
_async=new AsyncContextState(state);

View File

@ -55,7 +55,7 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
protected String _workerAttr;
protected long _reseed=100000L;
protected Server _server;
protected SessionScavenger _scavenger;
protected PeriodicSessionInspector _scavenger;
/* ------------------------------------------------------------ */
/**
@ -102,7 +102,7 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
/**
* @param scavenger a SessionScavenger
*/
public void setSessionScavenger (SessionScavenger scavenger)
public void setSessionScavenger (PeriodicSessionInspector scavenger)
{
_scavenger = scavenger;
_scavenger.setSessionIdManager(this);
@ -285,7 +285,7 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
if (_scavenger == null)
{
LOG.warn("No SessionScavenger set, using defaults");
_scavenger = new SessionScavenger();
_scavenger = new PeriodicSessionInspector();
_scavenger.setSessionIdManager(this);
}
@ -388,17 +388,14 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
//session data store, AND have listeners called on them.
//BUT want to avoid loading into memory sessions that this node is not managing (eg have 3 nodes all running session mgrs,
//all 3 find the expired session and load it into memory and expire it
if (removeId(id))
removeId(id);
//tell all contexts that may have a session object with this id to
//get rid of them
for (SessionManager manager:getSessionManagers())
{
//tell all contexts that may have a session object with this id to
//get rid of them
for (SessionManager manager:getSessionManagers())
{
manager.invalidate(id);
}
manager.invalidate(id);
}
else if (LOG.isDebugEnabled())
LOG.debug("Not present in idmgr: {}", id);
}
/* ------------------------------------------------------------ */
@ -408,14 +405,13 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
public void invalidateAll (String id)
{
//take the id out of the list of known sessionids for this node
if (removeId(id))
removeId(id);
//tell all contexts that may have a session object with this id to
//get rid of them
for (SessionManager manager:getSessionManagers())
{
//tell all contexts that may have a session object with this id to
//get rid of them
for (SessionManager manager:getSessionManagers())
{
manager.invalidate(id);
}
manager.invalidate(id);
}
}

View File

@ -20,10 +20,14 @@
package org.eclipse.jetty.server.session;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -43,8 +47,9 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
protected StalenessStrategy _staleStrategy;
protected SessionManager _manager;
protected SessionContext _context;
protected int _idlePassivationTimeoutSec;
private IdleInspector _idleInspector;
private ExpiryInspector _expiryInspector;
/**
@ -95,15 +100,6 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
/**
* Get a list of keys for sessions that the store thinks has expired
* @return ids of all Session objects that might have expired
*/
public abstract Set<String> doGetExpiredCandidates();
/**
*
*/
@ -129,7 +125,6 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
}
/**
* @see org.eclipse.jetty.server.session.SessionStore#initialize(org.eclipse.jetty.server.session.SessionContext)
*/
@ -158,6 +153,8 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
_sessionDataStore.initialize(_context);
_sessionDataStore.start();
_expiryInspector = new ExpiryInspector(this, _manager.getSessionIdManager());
super.doStart();
}
@ -168,6 +165,7 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
protected void doStop() throws Exception
{
_sessionDataStore.stop();
_expiryInspector = null;
super.doStop();
}
@ -204,6 +202,30 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
}
/**
* @see org.eclipse.jetty.server.session.SessionStore#getIdlePassivationTimeoutSec()
*/
public int getIdlePassivationTimeoutSec()
{
return _idlePassivationTimeoutSec;
}
/**
* @see org.eclipse.jetty.server.session.SessionStore#setIdlePassivationTimeoutSec(int)
*/
public void setIdlePassivationTimeoutSec(int idleTimeoutSec)
{
_idlePassivationTimeoutSec = idleTimeoutSec;
if (_idlePassivationTimeoutSec == 0)
_idleInspector = null;
else if (_idleInspector == null)
_idleInspector = new IdleInspector(this);
}
/**
* Get a session object.
*
@ -219,28 +241,47 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
//look locally
Session session = doGet(id);
//TODO also check that session is only written out if only the access time changes infrequently
if (staleCheck && isStale(session))
{
//delete from session store so should reload from session data store
doDelete(id);
session = null;
}
//not in session store, load the data for the session if possible
if (session == null && _sessionDataStore != null)
//session is either not in session store, or it is stale, or its been passivated, load the data for the session if possible
if (session == null || (staleCheck && isStale(session)) || session.isPassivated() && _sessionDataStore != null)
{
SessionData data = _sessionDataStore.load(id);
if (data != null)
//session wasn't in session store
if (session == null)
{
session = newSession(data);
session.setSessionManager(_manager);
Session existing = doPutIfAbsent(id, session);
if (existing != null)
if (data != null)
{
//some other thread has got in first and added the session
//so use it
session = existing;
session = newSession(data);
session.setSessionManager(_manager);
Session existing = doPutIfAbsent(id, session);
if (existing != null)
{
//some other thread has got in first and added the session
//so use it
session = existing;
}
}
//else session not in store and not in data store either, so doesn't exist
}
else
{
//session was already in session store, refresh it if its still stale/passivated
try (Lock lock = session.lock())
{
if (session.isPassivated() || staleCheck && isStale(session))
{
//if we were able to load it, then update our session object
if (data != null)
{
session.setPassivated(false);
session.getSessionData().copy(data);
session.didActivate();
}
else
session = null; //TODO rely on the expiry mechanism to get rid of it?
}
}
}
}
@ -303,11 +344,39 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
/**
* Remove a session object from this store and from any backing store.
*
* If session has been passivated, may need to reload it before it can
* be properly deleted
*
* @see org.eclipse.jetty.server.session.SessionStore#delete(java.lang.String)
*/
@Override
public Session delete(String id) throws Exception
{
//Ensure that the session object is not passivated so that its attributes
//are valid
Session session = doGet(id);
//TODO if (session == null) do we want to load it to delete it?
if (session != null)
{
try (Lock lock = session.lock())
{
//TODO don't check stale on deletion?
if (session.isPassivated() && _sessionDataStore != null)
{
session.setPassivated(false);
SessionData data = _sessionDataStore.load(id);
if (data != null)
{
session.getSessionData().copy(data);
session.didActivate();
}
}
}
}
//Always delete it from the data store
if (_sessionDataStore != null)
{
boolean dsdel = _sessionDataStore.delete(id);
@ -331,20 +400,94 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
/**
* @see org.eclipse.jetty.server.session.SessionStore#getExpired()
* @see org.eclipse.jetty.server.session.SessionStore#checkExpiry(java.util.Set)
*/
@Override
public Set<String> getExpired()
public Set<String> checkExpiration(Set<String> candidates)
{
if (!isStarted())
return Collections.emptySet();
Set<String> candidates = doGetExpiredCandidates();
if (LOG.isDebugEnabled())
LOG.debug("SessionStore checking expiration on {}", candidates);
return _sessionDataStore.getExpired(candidates);
}
/**
* @see org.eclipse.jetty.server.session.SessionStore#inspect()
*/
public void inspect ()
{
Stream<Session> stream = getStream();
try
{
_expiryInspector.preInspection();
if (_idleInspector != null)
_idleInspector.preInspection();
stream.forEach(s->{_expiryInspector.inspect(s); if (_idleInspector != null) _idleInspector.inspect(s);});
_expiryInspector.postInspection();
_idleInspector.postInspection();
}
finally
{
stream.close();
}
}
/**
* If the SessionDataStore supports passivation, passivate any
* sessions that have not be accessed for longer than x sec
*
* @param id identity of session to passivate
*/
public void passivateIdleSession(String id)
{
if (!isStarted())
return;
if (_sessionDataStore == null)
return; //no data store to passivate
if (!_sessionDataStore.isPassivating())
return; //doesn't support passivation
//get the session locally
Session s = doGet(id);
if (s == null)
{
LOG.warn("Session {} not in this session store", s);
return;
}
try (Lock lock = s.lock())
{
//check the session is still idle first
if (s.isValid() && s.isIdleLongerThan(_idlePassivationTimeoutSec))
{
s.willPassivate();
_sessionDataStore.store(id, s.getSessionData());
s.getSessionData().clearAllAttributes();
s.getSessionData().setDirty(false);
}
}
catch (Exception e)
{
LOG.warn("Passivation of idle session {} failed", id, e);
// TODO should do session.invalidate(); ???
}
}
/**
@ -355,5 +498,4 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
{
return null;
}
}

View File

@ -0,0 +1,114 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.session;
import java.util.HashSet;
import java.util.Set;
import org.eclipse.jetty.server.SessionIdManager;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* ExpiryInspector
*
*
*/
public class ExpiryInspector implements SessionInspector
{
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
protected Set<String> _expiryCandidates;
protected SessionIdManager _idManager;
protected AbstractSessionStore _sessionStore;
/**
* @param sessionStore
* @param idManager
*/
public ExpiryInspector (AbstractSessionStore sessionStore, SessionIdManager idManager)
{
_idManager = idManager;
_sessionStore = sessionStore;
}
/**
* @see org.eclipse.jetty.server.session.SessionInspector#inspect(org.eclipse.jetty.server.session.SessionStore, org.eclipse.jetty.server.session.Session)
*/
@Override
public void inspect(Session s)
{
//Does the session object think it is expired?
long now = System.currentTimeMillis();
if (s.isExpiredAt(now))
_expiryCandidates.add(s.getId());
}
/**
* @return the expiryCandidates
*/
public Set<String> getExpiryCandidates()
{
return _expiryCandidates;
}
/**
* @see org.eclipse.jetty.server.session.SessionInspector#preInspection()
*/
@Override
public void preInspection()
{
if (LOG.isDebugEnabled())
LOG.debug("PreInspection");
_expiryCandidates = new HashSet<String>();
}
/**
* @see org.eclipse.jetty.server.session.SessionInspector#postInspection()
*/
@Override
public void postInspection()
{
if (LOG.isDebugEnabled())
LOG.debug("ExpiryInspector checking expiration for {}", _expiryCandidates);
try
{
Set<String> candidates = _sessionStore.checkExpiration(_expiryCandidates);
for (String id:candidates)
{
_idManager.expireAll(id);
}
}
catch (Exception e)
{
LOG.warn(e);
}
finally
{
_expiryCandidates = null;
}
}
}

View File

@ -117,6 +117,8 @@ public class FileSessionDataStore extends AbstractSessionDataStore
public Set<String> getExpired(Set<String> candidates)
{
//we don't want to open up each file and check, so just leave it up to the SessionStore
//TODO as the session manager is likely to be a lazy loader, if a session is never requested, its
//file will stay forever after a restart
return candidates;
}

View File

@ -0,0 +1,108 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.session;
import java.util.HashSet;
import java.util.Set;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* IdleExpiryInspector
*
* Checks if a session is idle
*/
public class IdleInspector implements SessionInspector
{
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
protected Set<String> _idleCandidates;
protected AbstractSessionStore _sessionStore;
/**
* @param sessionStore
*/
public IdleInspector (AbstractSessionStore sessionStore)
{
_sessionStore = sessionStore;
}
/**
* @see org.eclipse.jetty.server.session.SessionInspector#inspect(org.eclipse.jetty.server.session.Session)
*/
@Override
public void inspect(Session s)
{
//Does the session object think it is expired?
long now = System.currentTimeMillis();
if (s.isExpiredAt(now))
return;
if (s.isValid() && s.isIdleLongerThan(_sessionStore.getIdlePassivationTimeoutSec()))
{
_idleCandidates.add(s.getId());
};
}
/**
* @return the idleCandidates
*/
public Set<String> getIdleCandidates()
{
return _idleCandidates;
}
/**
* @see org.eclipse.jetty.server.session.SessionInspector#preInspection()
*/
@Override
public void preInspection()
{
_idleCandidates = new HashSet<String>();
}
/**
* @see org.eclipse.jetty.server.session.SessionInspector#postInspection()
*/
@Override
public void postInspection()
{
for (String id:_idleCandidates)
{
try
{
_sessionStore.passivateIdleSession(id);
}
catch (Exception e)
{
LOG.warn(e);
}
}
_idleCandidates = null;
}
}

View File

@ -50,7 +50,7 @@ public class JDBCSessionIdManager extends org.eclipse.jetty.server.session.Abstr
protected final HashSet<String> _sessionIds = new HashSet<String>();
protected Server _server;
protected SessionScavenger _scavenger;
protected PeriodicSessionInspector _scavenger;
private DatabaseAdaptor _dbAdaptor = new DatabaseAdaptor();

View File

@ -22,6 +22,7 @@ package org.eclipse.jetty.server.session;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletRequest;
@ -150,7 +151,7 @@ public class MemorySessionStore extends AbstractSessionStore
}
/*
@Override
public Set<String> doGetExpiredCandidates()
@ -167,7 +168,7 @@ public class MemorySessionStore extends AbstractSessionStore
}
return candidates;
}
*/
@ -242,4 +243,14 @@ public class MemorySessionStore extends AbstractSessionStore
}
/**
* @see org.eclipse.jetty.server.session.SessionStore#getStream()
*/
@Override
public Stream<Session> getStream()
{
return _sessions.values().stream();
}
}

View File

@ -35,25 +35,26 @@ import org.eclipse.jetty.util.thread.Scheduler;
* There is 1 session scavenger per SessionIdManager/Server instance.
*
*/
public class SessionScavenger extends AbstractLifeCycle
public class PeriodicSessionInspector extends AbstractLifeCycle
{
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
public static final long DEFAULT_SCAVENGE_MS = 1000L * 60 * 10;
public static final long DEFAULT_PERIOD_MS = 1000L * 60 * 10;
protected SessionIdManager _sessionIdManager;
protected Scheduler _scheduler;
protected Scheduler.Task _task; //scavenge task
protected ScavengerRunner _runner;
protected Runner _runner;
protected boolean _ownScheduler = false;
private long _scavengeIntervalMs = DEFAULT_SCAVENGE_MS;
private long _intervalMs = DEFAULT_PERIOD_MS;
private long _lastTime = 0L;
/**
* ScavengerRunner
* Runner
*
*/
protected class ScavengerRunner implements Runnable
protected class Runner implements Runnable
{
@Override
@ -61,12 +62,12 @@ public class SessionScavenger extends AbstractLifeCycle
{
try
{
scavenge();
inspect();
}
finally
{
if (_scheduler != null && _scheduler.isRunning())
_task = _scheduler.schedule(this, _scavengeIntervalMs, TimeUnit.MILLISECONDS);
_task = _scheduler.schedule(this, _intervalMs, TimeUnit.MILLISECONDS);
}
}
}
@ -96,6 +97,7 @@ public class SessionScavenger extends AbstractLifeCycle
if (!(_sessionIdManager instanceof AbstractSessionIdManager))
throw new IllegalStateException ("SessionIdManager is not an AbstractSessionIdManager");
_lastTime = System.currentTimeMillis(); //set it to a non zero value
//try and use a common scheduler, fallback to own
_scheduler = ((AbstractSessionIdManager)_sessionIdManager).getServer().getBean(Scheduler.class);
@ -109,7 +111,7 @@ public class SessionScavenger extends AbstractLifeCycle
else if (!_scheduler.isStarted())
throw new IllegalStateException("Shared scheduler not started");
setScavengeIntervalSec(getScavengeIntervalSec());
setIntervalSec(getIntervalSec());
super.doStart();
}
@ -138,24 +140,24 @@ public class SessionScavenger extends AbstractLifeCycle
* Set the period between scavenge cycles
* @param sec
*/
public void setScavengeIntervalSec (long sec)
public void setIntervalSec (long sec)
{
if (sec<=0)
sec=60;
long old_period=_scavengeIntervalMs;
long old_period=_intervalMs;
long period=sec*1000L;
_scavengeIntervalMs=period;
_intervalMs=period;
//add a bit of variability into the scavenge time so that not all
//nodes with the same scavenge interval sync up
long tenPercent = _scavengeIntervalMs/10;
long tenPercent = _intervalMs/10;
if ((System.currentTimeMillis()%2) == 0)
_scavengeIntervalMs += tenPercent;
_intervalMs += tenPercent;
if (LOG.isDebugEnabled())
LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms");
LOG.debug("Inspecting every "+_intervalMs+" ms");
synchronized (this)
{
@ -164,8 +166,8 @@ public class SessionScavenger extends AbstractLifeCycle
if (_task!=null)
_task.cancel();
if (_runner == null)
_runner = new ScavengerRunner();
_task = _scheduler.schedule(_runner,_scavengeIntervalMs,TimeUnit.MILLISECONDS);
_runner = new Runner();
_task = _scheduler.schedule(_runner,_intervalMs,TimeUnit.MILLISECONDS);
}
}
}
@ -173,13 +175,13 @@ public class SessionScavenger extends AbstractLifeCycle
/**
* Get the period between scavenge cycles.
* Get the period between inspection cycles.
*
* @return
*/
public long getScavengeIntervalSec ()
public long getIntervalSec ()
{
return _scavengeIntervalMs/1000;
return _intervalMs/1000;
}
@ -189,26 +191,33 @@ public class SessionScavenger extends AbstractLifeCycle
* ask all SessionManagers to find sessions they think have expired and then make
* sure that a session sharing the same id is expired on all contexts
*/
public void scavenge ()
public void inspect ()
{
//don't attempt to scavenge if we are shutting down
if (isStopping() || isStopped())
return;
if (LOG.isDebugEnabled())
LOG.debug("Scavenging sessions");
LOG.debug("Inspecting sessions");
long now = System.currentTimeMillis();
//find the session managers
for (SessionManager manager:((AbstractSessionIdManager)_sessionIdManager).getSessionManagers())
{
if (manager != null)
{
manager.inspect();
/*
//call scavenge on each manager to find keys for sessions that have expired
Set<String> expiredKeys = manager.scavenge();
//for each expired session, tell the session id manager to invalidate its key on all contexts
for (String key:expiredKeys)
{
// if it recently expired
try
{
((AbstractSessionIdManager)_sessionIdManager).expireAll(key);
@ -217,7 +226,7 @@ public class SessionScavenger extends AbstractLifeCycle
{
LOG.warn(e);
}
}
}*/
}
}
}
@ -229,7 +238,7 @@ public class SessionScavenger extends AbstractLifeCycle
@Override
public String toString()
{
return super.toString()+"[interval="+_scavengeIntervalMs+", ownscheduler="+_ownScheduler+"]";
return super.toString()+"[interval="+_intervalMs+", ownscheduler="+_ownScheduler+"]";
}
}

View File

@ -66,6 +66,9 @@ public class Session implements SessionManager.SessionIf
private State _state = State.VALID; //state of the session:valid,invalid or being invalidated
private Locker _lock = new Locker();
private boolean _isPassivated;
public Session (HttpServletRequest request, SessionData data)
{
_sessionData = data;
@ -147,6 +150,17 @@ public class Session implements SessionManager.SessionIf
}
}
protected boolean isIdleLongerThan (int sec)
{
long now = System.currentTimeMillis();
try (Lock lock = _lock.lockIfNotHeld())
{
return ((_sessionData.getAccessed() + (sec*1000)) < now);
}
}
/* ------------------------------------------------------------ */
/**
* @param nodename
@ -520,7 +534,7 @@ public class Session implements SessionManager.SessionIf
public void setAttribute(String name, Object value)
{
Object old=null;
try (Lock lock = lock())
try (Lock lock = _lock.lockIfNotHeld())
{
//if session is not valid, don't accept the set
checkValidForWrite();
@ -578,7 +592,7 @@ public class Session implements SessionManager.SessionIf
String id = null;
String extendedId = null;
try (Lock lock = lock())
try (Lock lock = _lock.lockIfNotHeld())
{
checkValidForWrite(); //don't renew id on a session that is not valid
id = _sessionData.getId(); //grab the values as they are now
@ -601,7 +615,7 @@ public class Session implements SessionManager.SessionIf
*/
public void renewId (String oldId, String oldExtendedId, String newId, String newExtendedId)
{
try (Lock lock = lock())
try (Lock lock = _lock.lockIfNotHeld())
{
checkValidForWrite(); //can't change id on invalid session
@ -632,7 +646,7 @@ public class Session implements SessionManager.SessionIf
boolean result = false;
try (Lock lock = lock())
try (Lock lock = _lock.lockIfNotHeld())
{
switch (_state)
{
@ -687,7 +701,7 @@ public class Session implements SessionManager.SessionIf
/* ------------------------------------------------------------- */
protected void doInvalidate() throws IllegalStateException
{
try (Lock lock = lock())
try (Lock lock = _lock.lockIfNotHeld())
{
try
{
@ -769,4 +783,26 @@ public class Session implements SessionManager.SessionIf
{
return _sessionData;
}
/* ------------------------------------------------------------- */
/**
* @return
*/
public boolean isPassivated()
{
return _isPassivated;
}
/* ------------------------------------------------------------- */
/**
* @param isPassivated
*/
public void setPassivated(boolean isPassivated)
{
_isPassivated = isPassivated;
}
}

View File

@ -84,6 +84,36 @@ public class SessionData implements Serializable
}
/**
* Copy the info from the given sessiondata
*
* @param data the sessiondata to be copied
*/
public void copy (SessionData data)
{
if (data == null)
return; //don't copy if no data
if (data.getId() == null || !(getId().equals(data.getId())))
throw new IllegalStateException ("Can only copy data for same session id");
if (data == this)
return; //don't copy ourself
setLastNode(data.getLastNode());
setContextPath(data.getContextPath());
setVhost(data.getVhost());
setCookieSet(data.getCookieSet());
setCreated(data.getCreated());
setAccessed(data.getAccessed());
setLastAccessed(data.getLastAccessed());
setMaxInactiveMs(data.getMaxInactiveMs());
setExpiry(data.getExpiry());
setLastSaved(data.getLastSaved());
clearAllAttributes();
putAllAttributes(data.getAllAttributes());
}
/**
* @return time at which session was last written out
*/
@ -170,6 +200,14 @@ public class SessionData implements Serializable
_attributes.putAll(attributes);
}
/**
* Remove all attributes
*/
public void clearAllAttributes ()
{
_attributes.clear();
}
/**
* @return an unmodifiable map of the attributes
*/
@ -322,12 +360,6 @@ public class SessionData implements Serializable
_lastAccessed = lastAccessed;
}
/* public boolean isInvalid()
{
return _invalid;
}
*/
/**
* @return

View File

@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server.session;
/**
* SessionInspector
*
*
*/
public interface SessionInspector
{
public void preInspection();
public void inspect(Session s);
public void postInspection (); //on completion
}

View File

@ -1017,13 +1017,14 @@ public class SessionManager extends ContainerLifeCycle implements org.eclipse.je
/**
* @return
*/
public Set<String> scavenge ()
public void inspect ()
{
//don't attempt to scavenge if we are shutting down
if (isStopping() || isStopped())
return Collections.emptySet();
return;
return _sessionStore.getExpired();
_sessionStore.inspect();
//return _sessionStore.getExpired();
}

View File

@ -19,7 +19,9 @@
package org.eclipse.jetty.server.session;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import javax.servlet.http.HttpServletRequest;
@ -43,5 +45,9 @@ public interface SessionStore extends LifeCycle
boolean exists (String id) throws Exception;
Session delete (String id) throws Exception;
void shutdown ();
Set<String> getExpired ();
Set<String> checkExpiration (Set<String> candidates);
void inspect();
void setIdlePassivationTimeoutSec(int sec);
int getIdlePassivationTimeoutSec();
Stream<Session> getStream();
}

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Enumeration;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import javax.servlet.SessionCookieConfig;
import javax.servlet.http.HttpServletRequest;
@ -113,14 +114,16 @@ public class SessionCookieTest
}
/**
* @see org.eclipse.jetty.server.session.AbstractSessionStore#doGetExpiredCandidates()
* @see org.eclipse.jetty.server.session.SessionStore#getStream()
*/
@Override
public Set<String> doGetExpiredCandidates()
public Stream<Session> getStream()
{
// TODO Auto-generated method stub
return null;
}
}

View File

@ -1521,17 +1521,21 @@ public class ServletHandler extends ScopedHandler
//if the request already does not support async, then the setting for the filter
//is irrelevant. However if the request supports async but this filter does not
//temporarily turn it off for the execution of the filter
boolean requestAsyncSupported = baseRequest.isAsyncSupported();
try
if (baseRequest.isAsyncSupported() && !_filterHolder.isAsyncSupported())
{
if (!_filterHolder.isAsyncSupported() && requestAsyncSupported)
baseRequest.setAsyncSupported(false);
try
{
baseRequest.setAsyncSupported(false,_filterHolder.toString());
filter.doFilter(request, response, _next);
}
finally
{
baseRequest.setAsyncSupported(true,null);
}
}
else
filter.doFilter(request, response, _next);
}
finally
{
baseRequest.setAsyncSupported(requestAsyncSupported);
}
return;
}
@ -1594,17 +1598,21 @@ public class ServletHandler extends ScopedHandler
//if the request already does not support async, then the setting for the filter
//is irrelevant. However if the request supports async but this filter does not
//temporarily turn it off for the execution of the filter
boolean requestAsyncSupported = _baseRequest.isAsyncSupported();
try
if (!holder.isAsyncSupported() && _baseRequest.isAsyncSupported())
{
if (!holder.isAsyncSupported() && requestAsyncSupported)
_baseRequest.setAsyncSupported(false);
try
{
_baseRequest.setAsyncSupported(false,holder.toString());
filter.doFilter(request, response, this);
}
finally
{
_baseRequest.setAsyncSupported(true,null);
}
}
else
filter.doFilter(request, response, this);
}
finally
{
_baseRequest.setAsyncSupported(requestAsyncSupported);
}
return;
}

View File

@ -822,10 +822,20 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
if (_identityService!=null)
old_run_as=_identityService.setRunAs(baseRequest.getResolvedUserIdentity(),_runAsToken);
if (!isAsyncSupported())
baseRequest.setAsyncSupported(false);
servlet.service(request,response);
if (baseRequest.isAsyncSupported() && !isAsyncSupported())
{
try
{
baseRequest.setAsyncSupported(false,this.toString());
servlet.service(request,response);
}
finally
{
baseRequest.setAsyncSupported(true,null);
}
}
else
servlet.service(request,response);
}
catch(UnavailableException e)
{
@ -834,8 +844,6 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
}
finally
{
baseRequest.setAsyncSupported(suspendable);
// Pop run-as role.
if (_identityService!=null)
_identityService.unsetRunAs(old_run_as);

View File

@ -56,6 +56,7 @@ import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.URIUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.StdErrLog;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
@ -63,6 +64,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.eclipse.jetty.util.log.Log.getLogger;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
@ -125,6 +127,9 @@ public class AsyncServletTest
_servletHandler.addServletWithMapping(holder,"/path2/*");
_servletHandler.addServletWithMapping(holder,"/p th3/*");
_servletHandler.addServletWithMapping(new ServletHolder(new FwdServlet()),"/fwd/*");
ServletHolder holder2=new ServletHolder("NoAsync",_servlet);
holder2.setAsyncSupported(false);
_servletHandler.addServletWithMapping(holder2,"/noasync/*");
_server.start();
_port=_connector.getLocalPort();
__history.clear();
@ -177,6 +182,44 @@ public class AsyncServletTest
assertContains("NORMAL",response);
}
@Test
public void testAsyncNotSupportedNoAsync() throws Exception
{
_expectedCode="200 ";
String response=process("noasync","",null);
Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 200 OK"));
assertThat(__history,contains(
"REQUEST /ctx/noasync/info",
"initial"
));
assertContains("NORMAL",response);
}
@Test
public void testAsyncNotSupportedAsync() throws Exception
{
((StdErrLog)getLogger(ServletHandler.class)).setHideStacks(true);
try
{
_expectedCode="500 ";
String response=process("noasync","start=200",null);
Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 500 "));
assertThat(__history,contains(
"REQUEST /ctx/noasync/info",
"initial"
));
assertContains("HTTP ERROR: 500",response);
assertContains("!asyncSupported",response);
assertContains("AsyncServletTest$AsyncServlet",response);
}
finally
{
((StdErrLog)getLogger(ServletHandler.class)).setHideStacks(false);
}
}
@Test
public void testStart() throws Exception
{

View File

@ -19,22 +19,13 @@
package org.eclipse.jetty.util;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.charset.StandardCharsets;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
@ -44,6 +35,12 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class BufferUtilTest
{
@Test
@ -359,9 +356,9 @@ public class BufferUtilTest
assertTrue(BufferUtil.isMappedBuffer(mapped));
ByteBuffer direct = BufferUtil.allocateDirect(data.length());
direct.clear();
BufferUtil.clearToFill(direct);
direct.put(data.getBytes(StandardCharsets.ISO_8859_1));
direct.flip();
BufferUtil.flipToFlush(direct, 0);
assertEquals(data,BufferUtil.toString(direct));
assertFalse(BufferUtil.isMappedBuffer(direct));

View File

@ -71,6 +71,7 @@ import static org.junit.Assert.assertThat;
public class HttpClientLoadTest extends AbstractTest
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
private final AtomicLong requestCount = new AtomicLong();
private final AtomicLong connectionLeaks = new AtomicLong();
public HttpClientLoadTest(Transport transport)
@ -236,7 +237,7 @@ public class HttpClientLoadTest extends AbstractTest
logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000 / elapsed : -1);
for (String failure : failures)
System.err.println("FAILED: "+failure);
logger.info("FAILED: {}", failure);
Assert.assertTrue(failures.toString(), failures.isEmpty());
}
@ -267,8 +268,10 @@ public class HttpClientLoadTest extends AbstractTest
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
{
long requestId = requestCount.incrementAndGet();
Request request = client.newRequest(host, connector.getLocalPort())
.scheme(scheme)
.path("/" + requestId)
.method(method);
if (clientClose)
@ -326,7 +329,8 @@ public class HttpClientLoadTest extends AbstractTest
latch.countDown();
}
});
await(requestLatch, 5, TimeUnit.SECONDS);
if (!await(requestLatch, 5, TimeUnit.SECONDS))
logger.warn("Request {} took too long", requestId);
}
private boolean await(CountDownLatch latch, long time, TimeUnit unit)

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.server.session;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.io.PrintWriter;
@ -115,6 +116,7 @@ public abstract class AbstractImmortalSessionTest
else if ("get".equals(action))
{
HttpSession session = request.getSession(false);
assertNotNull(session);
if (session!=null)
result = (String)session.getAttribute("value");
}

View File

@ -42,7 +42,7 @@ public abstract class AbstractTestServer
protected final int _scavengePeriod;
protected final ContextHandlerCollection _contexts;
protected SessionIdManager _sessionIdManager;
private SessionScavenger _scavenger;
private PeriodicSessionInspector _scavenger;
@ -83,8 +83,8 @@ public abstract class AbstractTestServer
_sessionIdManager = newSessionIdManager(sessionIdMgrConfig);
_server.setSessionIdManager(_sessionIdManager);
((AbstractSessionIdManager) _sessionIdManager).setServer(_server);
_scavenger = new SessionScavenger();
_scavenger.setScavengeIntervalSec(scavengePeriod);
_scavenger = new PeriodicSessionInspector();
_scavenger.setIntervalSec(scavengePeriod);
((AbstractSessionIdManager)_sessionIdManager).setSessionScavenger(_scavenger);
}