Merge branch 'master' into jetty-9.4.x-Feature
This commit is contained in:
commit
d1504055de
51
VERSION.txt
51
VERSION.txt
|
@ -1,5 +1,48 @@
|
|||
jetty-9.4.0-SNAPSHOT
|
||||
|
||||
jetty-9.3.8.RC0 - 25 February 2016
|
||||
+ 81 Exception not always thrown in Jetty to application when upload part is
|
||||
too big
|
||||
+ 82 Request.getPart() that results in Exception still allows other parts to
|
||||
be fetched
|
||||
+ 251 Removing SSLEngine.beginHandshake() calls
|
||||
+ 285 PathContentProvider - Use of Direct buffers without pooling
|
||||
+ 298 qtp threads spin-locked in MBeanContainer.beanAdded
|
||||
+ 342 Reintroducing Response parameter to logExtended
|
||||
+ 344 init script does not properly display status of a non running service
|
||||
+ 346 HttpParser RFC2616 Compliance mode
|
||||
+ 347 Avoid sending request using a connection that is idle timing out
|
||||
+ 352 Integrate session idling for MongoSessionManager
|
||||
+ 354 Spin loop in case of exception thrown during accept()
|
||||
+ 355 Improve close behaviour
|
||||
+ 478918 Change javax.servlet.error,forward,include literals to
|
||||
RequestDispatcher constants
|
||||
+ 484446 InputStreamResponseListener's InputStream uses default read (3) and
|
||||
blocks early on never-ending response.
|
||||
+ 485306 HttpParser (HttpURI) mistaking basic auth password as a port number
|
||||
+ 485469 permessage-deflate extension causes protocol error in Firefox/Chrome
|
||||
+ 486394 Restore MultiPartFilter behavior with regards to temp file access
|
||||
+ 486497 NPE in MappedLoginService
|
||||
+ 486511 Server.getURI() returns wrong scheme on SSL/HTTPS
|
||||
+ 486530 Handler added to WebAppContext prevents ServletContext initialization
|
||||
+ 486589 HttpRequest has a wrong HTTP Version in HTTP/2.
|
||||
+ 486604 Add debug logging of ErrorPageErrorHandler logic
|
||||
+ 486674 Quickstart path attribute normalization should be based on longest
|
||||
path match
|
||||
+ 486829 Cancel stream error after a failed request with the HTTP/2.0 client.
|
||||
+ 486877 Google Chrome flagging 'obsolete cipher suite' in Jetty and will soon
|
||||
issue broken padlock
|
||||
+ 486930 Selector does not correctly handle rejected execution exception
|
||||
+ 487158 Switched SCM URIs to github
|
||||
+ 487197 Deflater/Inflater memory leak with WebSocket permessage-deflate
|
||||
extension
|
||||
+ 487198 ContextScopeListener should be called on context start and stop
|
||||
+ 487277 Introduce http-forwarded module for X-Forwarded support
|
||||
+ 487354 Aborted request or response does not send RST_STREAM frame.
|
||||
+ 487511 Jetty HTTP won't work on turkish systems.
|
||||
+ 487714 Avoid NPE in close race for async write
|
||||
+ 487750 HTTP/2 push must not be recursive.
|
||||
|
||||
jetty-9.2.15.v20160210 - 10 February 2016
|
||||
+ 482042 New API, Allow customization of ServletHandler path mapping
|
||||
+ 482243 Fixed GzipHandler for Include.
|
||||
|
@ -21,7 +64,7 @@ jetty-9.2.15.v20160210 - 10 February 2016
|
|||
+ 485535 jetty.sh results in FAILED when running service restart
|
||||
+ 485663 NullPointerException in WebSocketSession during upgrade with DEBUG
|
||||
logging
|
||||
+ 485712 Quickstart web.xml is absolute
|
||||
+ 485712 Quickstart web.xml is absolute
|
||||
|
||||
jetty-9.3.7.v20160115 - 15 January 2016
|
||||
+ 471171 Support SYNC_FLUSH in GzipHandler
|
||||
|
@ -1466,7 +1509,7 @@ jetty-9.1.2.v20140210 - 10 February 2014
|
|||
+ 423421 remove org.slf4j and org.ow2.asm from jetty-all artifact
|
||||
+ 424171 Old javax.activation jar interferes with email sending
|
||||
+ 424562 JDBCSessionManager.setNodeIdInSessionId(true) does not work
|
||||
+ 425275
|
||||
+ 425275
|
||||
org.eclipse.jetty.osgi.annotations.AnnotationConfiguration.BundleParserTask.getStatistic()
|
||||
returns null when debug is enabled.
|
||||
+ 425638 Fixed monitor module/xml typos
|
||||
|
@ -1769,7 +1812,7 @@ jetty-9.0.6.v20130930 - 30 September 2013
|
|||
in the value
|
||||
+ 415192 <jsp-file> maps to JspPropertyGroupServlet instead of JspServlet
|
||||
+ 415194 Deployer gives management of context to context collection
|
||||
+ 415302
|
||||
+ 415302
|
||||
+ 415330 Avoid multiple callbacks at EOF
|
||||
+ 415401 Add initalizeDefaults call to SpringConfigurationProcessor
|
||||
+ 415548 migrate ProxyHTTPToSPDYTest to use HttpClient to avoid intermittent
|
||||
|
@ -1909,7 +1952,7 @@ jetty-9.1.0.M0 - 16 September 2013
|
|||
+ 415131 Avoid autoboxing on debug
|
||||
+ 415192 <jsp-file> maps to JspPropertyGroupServlet instead of JspServlet
|
||||
+ 415194 Deployer gives management of context to context collection
|
||||
+ 415302
|
||||
+ 415302
|
||||
+ 415314 Jetty should not commit response on output if <
|
||||
Response.setBufferSize() bytes are written
|
||||
+ 415330 Avoid multiple callbacks at EOF
|
||||
|
|
|
@ -313,6 +313,9 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Aborted before processing {}: {}", exchange, cause);
|
||||
// Won't use this connection, release it back.
|
||||
if (!connectionPool.release(connection))
|
||||
connection.close();
|
||||
// It may happen that the request is aborted before the exchange
|
||||
// is created. Aborting the exchange a second time will result in
|
||||
// a no-operation, so we just abort here to cover that edge case.
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.eclipse.jetty.client.api.ContentResponse;
|
|||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.util.BasicAuthentication;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpScheme;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.util.B64Code;
|
||||
|
@ -157,4 +158,100 @@ public class HttpClientProxyTest extends AbstractHttpClientServerTest
|
|||
Assert.assertEquals(status, response3.getStatus());
|
||||
Assert.assertEquals(1, requests.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuthenticatedProxiedRequestWithRedirect() throws Exception
|
||||
{
|
||||
String user = "foo";
|
||||
String password = "bar";
|
||||
String credentials = B64Code.encode(user + ":" + password, StandardCharsets.ISO_8859_1);
|
||||
String proxyHost = "localhost";
|
||||
String serverHost = "server";
|
||||
int serverPort = HttpScheme.HTTP.is(scheme) ? 80 : 443;
|
||||
String realm = "test_realm";
|
||||
int status = HttpStatus.NO_CONTENT_204;
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
if (target.startsWith("/proxy"))
|
||||
{
|
||||
String authorization = request.getHeader(HttpHeader.PROXY_AUTHORIZATION.asString());
|
||||
if (authorization == null)
|
||||
{
|
||||
response.setStatus(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407);
|
||||
response.setHeader(HttpHeader.PROXY_AUTHENTICATE.asString(), "Basic realm=\"" + realm + "\"");
|
||||
}
|
||||
else
|
||||
{
|
||||
String prefix = "Basic ";
|
||||
if (authorization.startsWith(prefix))
|
||||
{
|
||||
String attempt = authorization.substring(prefix.length());
|
||||
if (credentials.equals(attempt))
|
||||
{
|
||||
// Change also the host, to verify that proxy authentication works in this case too.
|
||||
response.sendRedirect(scheme + "://127.0.0.1:" + serverPort + "/server");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (target.startsWith("/server"))
|
||||
{
|
||||
response.setStatus(status);
|
||||
}
|
||||
else
|
||||
{
|
||||
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
int proxyPort = connector.getLocalPort();
|
||||
client.getProxyConfiguration().getProxies().add(new HttpProxy(proxyHost, proxyPort));
|
||||
|
||||
ContentResponse response1 = client.newRequest(serverHost, serverPort)
|
||||
.scheme(scheme)
|
||||
.path("/proxy")
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
// No Authentication available => 407.
|
||||
Assert.assertEquals(HttpStatus.PROXY_AUTHENTICATION_REQUIRED_407, response1.getStatus());
|
||||
|
||||
// Add authentication...
|
||||
URI uri = URI.create(scheme + "://" + proxyHost + ":" + proxyPort);
|
||||
client.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, user, password));
|
||||
final AtomicInteger requests = new AtomicInteger();
|
||||
client.getRequestListeners().add(new Request.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(Request request)
|
||||
{
|
||||
requests.incrementAndGet();
|
||||
}
|
||||
});
|
||||
// ...and perform the request again => 407 + 302 + 204.
|
||||
ContentResponse response2 = client.newRequest(serverHost, serverPort)
|
||||
.scheme(scheme)
|
||||
.path("/proxy")
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(status, response2.getStatus());
|
||||
Assert.assertEquals(3, requests.get());
|
||||
|
||||
// Now the authentication result is cached => 204.
|
||||
requests.set(0);
|
||||
ContentResponse response3 = client.newRequest(serverHost, serverPort)
|
||||
.scheme(scheme)
|
||||
.path("/server")
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(status, response3.getStatus());
|
||||
Assert.assertEquals(1, requests.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,33 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
|
|||
super(sslContextFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortBeforeQueued() throws Exception
|
||||
{
|
||||
start(new EmptyServerHandler());
|
||||
|
||||
Exception failure = new Exception("oops");
|
||||
try
|
||||
{
|
||||
Request request = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.timeout(5, TimeUnit.SECONDS);
|
||||
request.abort(failure);
|
||||
request.send();
|
||||
Assert.fail();
|
||||
}
|
||||
catch (ExecutionException x)
|
||||
{
|
||||
Assert.assertSame(failure, x.getCause());
|
||||
// Make sure the pool is in a sane state.
|
||||
HttpDestination destination = (HttpDestination)client.getDestination(scheme, "localhost", connector.getLocalPort());
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
Assert.assertEquals(1, connectionPool.getConnectionCount());
|
||||
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
|
||||
Assert.assertEquals(1, connectionPool.getIdleConnections().size());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortOnQueued() throws Exception
|
||||
{
|
||||
|
|
|
@ -89,7 +89,7 @@ public class ClientGenerator extends Generator
|
|||
beginRequestBuffer.putInt(0x00_08_00_00);
|
||||
// Hardcode RESPONDER role and KEEP_ALIVE flag
|
||||
beginRequestBuffer.putLong(0x00_01_01_00_00_00_00_00L);
|
||||
beginRequestBuffer.flip();
|
||||
BufferUtil.flipToFlush(beginRequestBuffer, 0);
|
||||
|
||||
int index = 0;
|
||||
while (fieldsLength > 0)
|
||||
|
@ -129,7 +129,7 @@ public class ClientGenerator extends Generator
|
|||
}
|
||||
|
||||
buffer.putShort(4, (short)length);
|
||||
buffer.flip();
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
}
|
||||
|
||||
|
||||
|
@ -140,7 +140,7 @@ public class ClientGenerator extends Generator
|
|||
// Generate the last FCGI_PARAMS frame
|
||||
lastParamsBuffer.putInt(0x01_04_00_00 + request);
|
||||
lastParamsBuffer.putInt(0x00_00_00_00);
|
||||
lastParamsBuffer.flip();
|
||||
BufferUtil.flipToFlush(lastParamsBuffer, 0);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public class Generator
|
|||
int length = Math.min(MAX_CONTENT_LENGTH, contentLength);
|
||||
buffer.putShort((short)length);
|
||||
buffer.putShort((short)0);
|
||||
buffer.flip();
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
|
||||
if (contentLength == 0)
|
||||
break;
|
||||
|
|
|
@ -95,7 +95,7 @@ public class ServerGenerator extends Generator
|
|||
buffer.put(bytes.get(i)).put(COLON).put(bytes.get(i + 1)).put(EOL);
|
||||
buffer.put(EOL);
|
||||
|
||||
buffer.flip();
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
|
||||
return generateContent(request, buffer, true, false, callback, FCGI.FrameType.STDOUT);
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ public class ServerGenerator extends Generator
|
|||
endRequestBuffer.putInt(0x00_08_00_00);
|
||||
endRequestBuffer.putInt(aborted ? 1 : 0);
|
||||
endRequestBuffer.putInt(0);
|
||||
endRequestBuffer.flip();
|
||||
BufferUtil.flipToFlush(endRequestBuffer, 0);
|
||||
return endRequestBuffer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
<Arg>
|
||||
<Ref id="Server"/>
|
||||
</Arg>
|
||||
<Set name="workerName"><Property name="jetty.gcloudSession.workerName" default="node1"/></Set>
|
||||
<Set name="workerName"><Property name="jetty.gcloudSession.workerName"><Default>node<Env name="GAE_MODULE_INSTANCE" default="0"/></Default></Property></Set>
|
||||
<Set name="config"><Ref id="gconf"/></Set>
|
||||
</New>
|
||||
</Set>
|
||||
|
|
|
@ -52,7 +52,8 @@ https://github.com/GoogleCloudPlatform/gcloud-java
|
|||
http://www.apache.org/licenses/LICENSE-2.0.html
|
||||
|
||||
[ini-template]
|
||||
## Unique identifier for this node in the cluster
|
||||
## Unique identifier to force the workername for this node in the cluster
|
||||
## If not set, will default to the string "node" plus the Env variable $GAE_MODULE_INSTANCE
|
||||
# jetty.gcloudSession.workerName=node1
|
||||
|
||||
|
||||
|
|
|
@ -88,6 +88,56 @@ public class HTTP2Test extends AbstractTest
|
|||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestNoContentResponseEmptyContent() throws Exception
|
||||
{
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, false), new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), NOOP);
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
Session session = newClient(new Session.Listener.Adapter());
|
||||
|
||||
HttpFields fields = new HttpFields();
|
||||
MetaData.Request metaData = newRequest("GET", fields);
|
||||
HeadersFrame frame = new HeadersFrame(metaData, null, true);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
Assert.assertFalse(frame.isEndStream());
|
||||
Assert.assertEquals(stream.getId(), frame.getStreamId());
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
Assert.assertTrue(frame.isEndStream());
|
||||
callback.succeeded();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestNoContentResponseContent() throws Exception
|
||||
{
|
||||
|
|
|
@ -357,6 +357,7 @@ public class IdleTimeoutTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
dataLatch.countDown();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http2.client;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.ISession;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.frames.DataFrame;
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.FuturePromise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class InterleavingTest extends AbstractTest
|
||||
{
|
||||
@Test
|
||||
public void testInterleaving() throws Exception
|
||||
{
|
||||
CountDownLatch serverStreamsLatch = new CountDownLatch(2);
|
||||
List<Stream> serverStreams = new ArrayList<>();
|
||||
start(new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverStreams.add(stream);
|
||||
serverStreamsLatch.countDown();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
int maxFrameSize = Frame.DEFAULT_MAX_LENGTH + 1;
|
||||
Session session = newClient(new Session.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Map<Integer, Integer> onPreface(Session session)
|
||||
{
|
||||
Map<Integer, Integer> settings = new HashMap<>();
|
||||
settings.put(SettingsFrame.MAX_FRAME_SIZE, maxFrameSize);
|
||||
return settings;
|
||||
}
|
||||
});
|
||||
|
||||
BlockingQueue<FrameBytesCallback> dataFrames = new LinkedBlockingDeque<>();
|
||||
Stream.Listener streamListener = new Stream.Listener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
ByteBuffer data = frame.getData();
|
||||
byte[] bytes = new byte[data.remaining()];
|
||||
data.get(bytes);
|
||||
dataFrames.offer(new FrameBytesCallback(frame, bytes, callback));
|
||||
}
|
||||
};
|
||||
|
||||
HeadersFrame headersFrame1 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true);
|
||||
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
|
||||
session.newStream(headersFrame1, streamPromise1, streamListener);
|
||||
streamPromise1.get(5, TimeUnit.SECONDS);
|
||||
|
||||
HeadersFrame headersFrame2 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true);
|
||||
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
|
||||
session.newStream(headersFrame2, streamPromise2, streamListener);
|
||||
streamPromise2.get(5, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertTrue(serverStreamsLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
Stream serverStream1 = serverStreams.get(0);
|
||||
Stream serverStream2 = serverStreams.get(1);
|
||||
MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
|
||||
serverStream1.headers(new HeadersFrame(serverStream1.getId(), response1, null, false), Callback.NOOP);
|
||||
|
||||
Random random = new Random();
|
||||
byte[] content1 = new byte[2 * ((ISession)serverStream1.getSession()).updateSendWindow(0)];
|
||||
random.nextBytes(content1);
|
||||
byte[] content2 = new byte[2 * ((ISession)serverStream2.getSession()).updateSendWindow(0)];
|
||||
random.nextBytes(content2);
|
||||
|
||||
MetaData.Response response2 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
|
||||
serverStream2.headers(new HeadersFrame(serverStream2.getId(), response2, null, false), new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
// Write data for both streams from within the callback so that they get queued together.
|
||||
|
||||
ByteBuffer buffer1 = ByteBuffer.wrap(content1);
|
||||
serverStream1.data(new DataFrame(serverStream1.getId(), buffer1, true), NOOP);
|
||||
|
||||
ByteBuffer buffer2 = ByteBuffer.wrap(content2);
|
||||
serverStream2.data(new DataFrame(serverStream2.getId(), buffer2, true), NOOP);
|
||||
}
|
||||
});
|
||||
|
||||
// The client reads with a buffer size that is different from the
|
||||
// frame size and synthesizes DATA frames, so expect N frames for
|
||||
// stream1 up to maxFrameSize of data, then M frames for stream2
|
||||
// up to maxFrameSize of data, and so forth, interleaved.
|
||||
|
||||
Map<Integer, ByteArrayOutputStream> contents = new HashMap<>();
|
||||
contents.put(serverStream1.getId(), new ByteArrayOutputStream());
|
||||
contents.put(serverStream2.getId(), new ByteArrayOutputStream());
|
||||
List<StreamLength> streamLengths = new ArrayList<>();
|
||||
int finished = 0;
|
||||
while (finished < 2)
|
||||
{
|
||||
FrameBytesCallback frameBytesCallback = dataFrames.poll(5, TimeUnit.SECONDS);
|
||||
if (frameBytesCallback == null)
|
||||
Assert.fail();
|
||||
|
||||
DataFrame dataFrame = frameBytesCallback.frame;
|
||||
int streamId = dataFrame.getStreamId();
|
||||
int length = dataFrame.remaining();
|
||||
streamLengths.add(new StreamLength(streamId, length));
|
||||
if (dataFrame.isEndStream())
|
||||
++finished;
|
||||
|
||||
contents.get(streamId).write(frameBytesCallback.bytes);
|
||||
|
||||
frameBytesCallback.callback.succeeded();
|
||||
}
|
||||
|
||||
// Verify that the content has been sent properly.
|
||||
Assert.assertArrayEquals(content1, contents.get(serverStream1.getId()).toByteArray());
|
||||
Assert.assertArrayEquals(content2, contents.get(serverStream2.getId()).toByteArray());
|
||||
|
||||
// Verify that the interleaving is correct.
|
||||
Map<Integer, List<Integer>> groups = new HashMap<>();
|
||||
groups.put(serverStream1.getId(), new ArrayList<>());
|
||||
groups.put(serverStream2.getId(), new ArrayList<>());
|
||||
int currentStream = 0;
|
||||
int currentLength = 0;
|
||||
for (StreamLength streamLength : streamLengths)
|
||||
{
|
||||
if (currentStream == 0)
|
||||
currentStream = streamLength.stream;
|
||||
if (currentStream != streamLength.stream)
|
||||
{
|
||||
groups.get(currentStream).add(currentLength);
|
||||
currentStream = streamLength.stream;
|
||||
currentLength = 0;
|
||||
}
|
||||
currentLength += streamLength.length;
|
||||
}
|
||||
groups.get(currentStream).add(currentLength);
|
||||
|
||||
Logger logger = Log.getLogger(getClass());
|
||||
logger.debug("frame lengths = {}", streamLengths);
|
||||
|
||||
groups.forEach((stream, lengths) ->
|
||||
{
|
||||
logger.debug("stream {} interleaved lengths = {}", stream, lengths);
|
||||
for (Integer length : lengths)
|
||||
Assert.assertThat(length, Matchers.lessThanOrEqualTo(maxFrameSize));
|
||||
});
|
||||
}
|
||||
|
||||
private static class FrameBytesCallback
|
||||
{
|
||||
private final DataFrame frame;
|
||||
private final byte[] bytes;
|
||||
private final Callback callback;
|
||||
|
||||
private FrameBytesCallback(DataFrame frame, byte[] bytes, Callback callback)
|
||||
{
|
||||
this.frame = frame;
|
||||
this.bytes = bytes;
|
||||
this.callback = callback;
|
||||
}
|
||||
}
|
||||
|
||||
private static class StreamLength
|
||||
{
|
||||
private final int stream;
|
||||
private final int length;
|
||||
|
||||
private StreamLength(int stream, int length)
|
||||
{
|
||||
this.stream = stream;
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("(%d,%d)", stream, length);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -53,6 +53,7 @@ import org.eclipse.jetty.server.Connector;
|
|||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.HttpConnectionFactory;
|
||||
import org.eclipse.jetty.util.ArrayQueue;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.junit.Assert;
|
||||
|
@ -168,12 +169,12 @@ public class PrefaceTest extends AbstractTest
|
|||
ByteBuffer buffer = byteBufferPool.acquire(1024, true);
|
||||
while (true)
|
||||
{
|
||||
BufferUtil.clearToFill(buffer);
|
||||
int read = socket.read(buffer);
|
||||
buffer.flip();
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
if (read < 0)
|
||||
break;
|
||||
parser.parse(buffer);
|
||||
buffer.clear();
|
||||
}
|
||||
|
||||
Assert.assertEquals(2, settings.size());
|
||||
|
@ -248,9 +249,9 @@ public class PrefaceTest extends AbstractTest
|
|||
ByteBuffer buffer = byteBufferPool.acquire(1024, true);
|
||||
http1: while (true)
|
||||
{
|
||||
buffer.clear();
|
||||
BufferUtil.clearToFill(buffer);
|
||||
int read = socket.read(buffer);
|
||||
buffer.flip();
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
if (read < 0)
|
||||
Assert.fail();
|
||||
|
||||
|
@ -314,9 +315,9 @@ public class PrefaceTest extends AbstractTest
|
|||
if (responded.get())
|
||||
break;
|
||||
|
||||
buffer.clear();
|
||||
BufferUtil.clearToFill(buffer);
|
||||
int read = socket.read(buffer);
|
||||
buffer.flip();
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
if (read < 0)
|
||||
Assert.fail();
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
warmupLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -188,6 +189,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
warmupLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -273,6 +275,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
warmupLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -298,6 +301,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
pushLatch.countDown();
|
||||
}
|
||||
};
|
||||
|
@ -325,6 +329,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
secondaryResponseLatch.countDown();
|
||||
}
|
||||
|
@ -372,6 +377,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
warmupLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -655,6 +661,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
warmupLatch.countDown();
|
||||
}
|
||||
|
@ -676,6 +683,7 @@ public class PushCacheFilterTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
if (frame.isEndStream())
|
||||
primaryResponseLatch.countDown();
|
||||
}
|
||||
|
|
|
@ -130,8 +130,14 @@ public class StreamCloseTest extends AbstractTest
|
|||
public void onData(final Stream stream, DataFrame frame, final Callback callback)
|
||||
{
|
||||
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
|
||||
|
||||
// We must copy the data that we send asynchronously.
|
||||
ByteBuffer data = frame.getData();
|
||||
ByteBuffer copy = ByteBuffer.allocate(data.remaining());
|
||||
copy.put(data).flip();
|
||||
|
||||
completable.thenRun(() ->
|
||||
stream.data(frame, new Callback()
|
||||
stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
|
@ -157,6 +163,7 @@ public class StreamCloseTest extends AbstractTest
|
|||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
// The sent data callback may not be notified yet here.
|
||||
callback.succeeded();
|
||||
completeLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
|
|
@ -70,6 +70,10 @@ public class StreamCountTest extends AbstractTest
|
|||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
||||
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -144,6 +148,10 @@ public class StreamCountTest extends AbstractTest
|
|||
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, fields);
|
||||
stream.headers(new HeadersFrame(stream.getId(), metaData, null, true), callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -181,6 +181,7 @@ public class StreamResetTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
stream1DataLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -196,6 +197,7 @@ public class StreamResetTest extends AbstractTest
|
|||
@Override
|
||||
public void onData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
callback.succeeded();
|
||||
stream2DataLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
|
|
@ -22,9 +22,7 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
|
@ -43,11 +41,11 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
|
||||
private final Queue<WindowEntry> windows = new ArrayDeque<>();
|
||||
private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
|
||||
private final Map<IStream, Integer> streams = new HashMap<>();
|
||||
private final List<Entry> resets = new ArrayList<>();
|
||||
private final Queue<Entry> entries = new ArrayDeque<>();
|
||||
private final List<Entry> actives = new ArrayList<>();
|
||||
private final HTTP2Session session;
|
||||
private final ByteBufferPool.Lease lease;
|
||||
private Entry stalled;
|
||||
private boolean terminated;
|
||||
|
||||
public HTTP2Flusher(HTTP2Session session)
|
||||
|
@ -106,17 +104,6 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
return !closed;
|
||||
}
|
||||
|
||||
private Entry remove(int index)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (index == 0)
|
||||
return frames.pollUnsafe();
|
||||
else
|
||||
return frames.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
public int getQueueSize()
|
||||
{
|
||||
synchronized (this)
|
||||
|
@ -136,112 +123,75 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
if (terminated)
|
||||
throw new ClosedChannelException();
|
||||
|
||||
// First thing, update the window sizes, so we can
|
||||
// reason about the frames to remove from the queue.
|
||||
while (!windows.isEmpty())
|
||||
{
|
||||
WindowEntry entry = windows.poll();
|
||||
entry.perform();
|
||||
}
|
||||
|
||||
// Now the window sizes cannot change.
|
||||
// Window updates that happen concurrently will
|
||||
// be queued and processed on the next iteration.
|
||||
int sessionWindow = session.getSendWindow();
|
||||
|
||||
int index = 0;
|
||||
int size = frames.size();
|
||||
while (index < size)
|
||||
if (!frames.isEmpty())
|
||||
{
|
||||
Entry entry = frames.get(index);
|
||||
IStream stream = entry.stream;
|
||||
|
||||
// If the stream has been reset, don't send the frame.
|
||||
if (stream != null && stream.isReset() && !entry.isProtocol())
|
||||
for (Entry entry : frames)
|
||||
{
|
||||
remove(index);
|
||||
--size;
|
||||
resets.add(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Gathered for reset {}", entry);
|
||||
continue;
|
||||
entries.offer(entry);
|
||||
actives.add(entry);
|
||||
}
|
||||
|
||||
// Check if the frame fits in the flow control windows.
|
||||
int remaining = entry.dataRemaining();
|
||||
if (remaining > 0)
|
||||
{
|
||||
if (sessionWindow <= 0)
|
||||
{
|
||||
++index;
|
||||
// There may be *non* flow controlled frames to send.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (stream != null)
|
||||
{
|
||||
// The stream may have a smaller window than the session.
|
||||
Integer streamWindow = streams.get(stream);
|
||||
if (streamWindow == null)
|
||||
{
|
||||
streamWindow = stream.updateSendWindow(0);
|
||||
streams.put(stream, streamWindow);
|
||||
}
|
||||
|
||||
// Is it a frame belonging to an already stalled stream ?
|
||||
if (streamWindow <= 0)
|
||||
{
|
||||
++index;
|
||||
// There may be *non* flow controlled frames to send.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// The frame fits both flow control windows, reduce them.
|
||||
sessionWindow -= remaining;
|
||||
if (stream != null)
|
||||
streams.put(stream, streams.get(stream) - remaining);
|
||||
}
|
||||
|
||||
// The frame will be written, remove it from the queue.
|
||||
remove(index);
|
||||
--size;
|
||||
actives.add(entry);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Gathered for write {}", entry);
|
||||
frames.clear();
|
||||
}
|
||||
streams.clear();
|
||||
}
|
||||
|
||||
// Perform resets outside the sync block.
|
||||
for (int i = 0; i < resets.size(); ++i)
|
||||
{
|
||||
Entry entry = resets.get(i);
|
||||
entry.reset();
|
||||
}
|
||||
resets.clear();
|
||||
|
||||
if (actives.isEmpty())
|
||||
if (entries.isEmpty())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Flushed {}", session);
|
||||
return Action.IDLE;
|
||||
}
|
||||
|
||||
for (int i = 0; i < actives.size(); ++i)
|
||||
while (!entries.isEmpty())
|
||||
{
|
||||
Entry entry = actives.get(i);
|
||||
Throwable failure = entry.generate(lease);
|
||||
if (failure != null)
|
||||
Entry entry = entries.poll();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Processing {}", entry);
|
||||
|
||||
// If the stream has been reset, don't send the frame.
|
||||
if (entry.reset())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Resetting {}", entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (entry.generate(lease))
|
||||
{
|
||||
if (entry.dataRemaining() > 0)
|
||||
entries.offer(entry);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (stalled == null)
|
||||
stalled = entry;
|
||||
}
|
||||
}
|
||||
catch (Throwable failure)
|
||||
{
|
||||
// Failure to generate the entry is catastrophic.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Failure generating frame " + entry.frame, failure);
|
||||
failed(failure);
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
}
|
||||
|
||||
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
|
||||
if (byteBuffers.isEmpty())
|
||||
{
|
||||
complete();
|
||||
return Action.IDLE;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
|
||||
session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
|
||||
|
@ -251,17 +201,45 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
lease.recycle();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Written {} frames for {}", actives.size(), actives);
|
||||
|
||||
actives.forEach(Entry::succeeded);
|
||||
actives.clear();
|
||||
complete();
|
||||
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
private void complete()
|
||||
{
|
||||
lease.recycle();
|
||||
|
||||
actives.forEach(Entry::complete);
|
||||
|
||||
if (stalled != null)
|
||||
{
|
||||
// We have written part of the frame, but there is more to write.
|
||||
// The API will not allow to send two data frames for the same
|
||||
// stream so we append the unfinished frame at the end to allow
|
||||
// better interleaving with other streams.
|
||||
int index = actives.indexOf(stalled);
|
||||
for (int i = index; i < actives.size(); ++i)
|
||||
{
|
||||
Entry entry = actives.get(i);
|
||||
if (entry.dataRemaining() > 0)
|
||||
append(entry);
|
||||
}
|
||||
for (int i = 0; i < index; ++i)
|
||||
{
|
||||
Entry entry = actives.get(i);
|
||||
if (entry.dataRemaining() > 0)
|
||||
append(entry);
|
||||
}
|
||||
stalled = null;
|
||||
}
|
||||
|
||||
actives.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
{
|
||||
|
@ -317,6 +295,7 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
protected final Frame frame;
|
||||
protected final IStream stream;
|
||||
protected final Callback callback;
|
||||
private boolean reset;
|
||||
|
||||
protected Entry(Frame frame, IStream stream, Callback callback)
|
||||
{
|
||||
|
@ -330,14 +309,14 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
return 0;
|
||||
}
|
||||
|
||||
public Throwable generate(ByteBufferPool.Lease lease)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
protected abstract boolean generate(ByteBufferPool.Lease lease);
|
||||
|
||||
public void reset()
|
||||
private void complete()
|
||||
{
|
||||
failed(new EofException("reset"));
|
||||
if (reset)
|
||||
failed(new EofException("reset"));
|
||||
else
|
||||
succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -351,7 +330,12 @@ public class HTTP2Flusher extends IteratingCallback
|
|||
callback.failed(x);
|
||||
}
|
||||
|
||||
public boolean isProtocol()
|
||||
private boolean reset()
|
||||
{
|
||||
return this.reset = stream != null && stream.isReset() && !isProtocol();
|
||||
}
|
||||
|
||||
private boolean isProtocol()
|
||||
{
|
||||
switch (frame.getType())
|
||||
{
|
||||
|
|
|
@ -1048,22 +1048,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
super(frame, stream, callback);
|
||||
}
|
||||
|
||||
public Throwable generate(ByteBufferPool.Lease lease)
|
||||
protected boolean generate(ByteBufferPool.Lease lease)
|
||||
{
|
||||
try
|
||||
{
|
||||
generator.control(lease, frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Generated {}", frame);
|
||||
prepare();
|
||||
return null;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Failure generating frame " + frame, x);
|
||||
return x;
|
||||
}
|
||||
generator.control(lease, frame);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Generated {}", frame);
|
||||
prepare();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1154,71 +1145,58 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
|
||||
private class DataEntry extends HTTP2Flusher.Entry
|
||||
{
|
||||
private int length;
|
||||
private int remaining;
|
||||
private int generated;
|
||||
|
||||
private DataEntry(DataFrame frame, IStream stream, Callback callback)
|
||||
{
|
||||
super(frame, stream, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int dataRemaining()
|
||||
{
|
||||
// We don't do any padding, so the flow control length is
|
||||
// always the data remaining. This simplifies the handling
|
||||
// of data frames that cannot be completely written due to
|
||||
// the flow control window exhausting, since in that case
|
||||
// we would have to count the padding only once.
|
||||
return ((DataFrame)frame).remaining();
|
||||
remaining = frame.remaining();
|
||||
}
|
||||
|
||||
public Throwable generate(ByteBufferPool.Lease lease)
|
||||
@Override
|
||||
public int dataRemaining()
|
||||
{
|
||||
try
|
||||
{
|
||||
int flowControlLength = dataRemaining();
|
||||
return remaining;
|
||||
}
|
||||
|
||||
int sessionSendWindow = getSendWindow();
|
||||
if (sessionSendWindow < 0)
|
||||
throw new IllegalStateException();
|
||||
protected boolean generate(ByteBufferPool.Lease lease)
|
||||
{
|
||||
int toWrite = dataRemaining();
|
||||
|
||||
int streamSendWindow = stream.updateSendWindow(0);
|
||||
if (streamSendWindow < 0)
|
||||
throw new IllegalStateException();
|
||||
int sessionSendWindow = getSendWindow();
|
||||
int streamSendWindow = stream.updateSendWindow(0);
|
||||
int window = Math.min(streamSendWindow, sessionSendWindow);
|
||||
if (window <= 0 && toWrite > 0)
|
||||
return false;
|
||||
|
||||
int window = Math.min(streamSendWindow, sessionSendWindow);
|
||||
int length = Math.min(toWrite, window);
|
||||
|
||||
int length = this.length = Math.min(flowControlLength, window);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Generated {}, length/window={}/{}", frame, length, window);
|
||||
int generated = generator.data(lease, (DataFrame)frame, length);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, generated, window, toWrite);
|
||||
|
||||
generator.data(lease, (DataFrame)frame, length);
|
||||
flowControl.onDataSending(stream, length);
|
||||
return null;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Failure generating frame " + frame, x);
|
||||
return x;
|
||||
}
|
||||
this.generated += generated;
|
||||
this.remaining -= generated;
|
||||
|
||||
flowControl.onDataSending(stream, generated);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
flowControl.onDataSent(stream, length);
|
||||
flowControl.onDataSent(stream, generated);
|
||||
generated = 0;
|
||||
// Do we have more to send ?
|
||||
DataFrame dataFrame = (DataFrame)frame;
|
||||
if (dataFrame.remaining() > 0)
|
||||
{
|
||||
// We have written part of the frame, but there is more to write.
|
||||
// The API will not allow to send two data frames for the same
|
||||
// stream so we append the unfinished frame at the end to allow
|
||||
// better interleaving with other streams.
|
||||
flusher.append(this);
|
||||
}
|
||||
else
|
||||
if (dataRemaining() == 0)
|
||||
{
|
||||
// Only now we can update the close state
|
||||
// and eventually remove the stream.
|
||||
|
|
|
@ -36,43 +36,34 @@ public class DataGenerator
|
|||
this.headerGenerator = headerGenerator;
|
||||
}
|
||||
|
||||
public void generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
|
||||
public int generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
|
||||
{
|
||||
generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength);
|
||||
return generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength);
|
||||
}
|
||||
|
||||
public void generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength)
|
||||
public int generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength)
|
||||
{
|
||||
if (streamId < 0)
|
||||
throw new IllegalArgumentException("Invalid stream id: " + streamId);
|
||||
|
||||
int dataLength = data.remaining();
|
||||
int maxFrameSize = headerGenerator.getMaxFrameSize();
|
||||
if (dataLength <= maxLength && dataLength <= maxFrameSize)
|
||||
int length = Math.min(dataLength, Math.min(maxFrameSize, maxLength));
|
||||
if (length == dataLength)
|
||||
{
|
||||
// Single frame.
|
||||
generateFrame(lease, streamId, data, last);
|
||||
return;
|
||||
}
|
||||
|
||||
// Other cases, we need to slice the original buffer into multiple frames.
|
||||
|
||||
int length = Math.min(maxLength, dataLength);
|
||||
int frames = length / maxFrameSize;
|
||||
if (frames * maxFrameSize != length)
|
||||
++frames;
|
||||
|
||||
int begin = data.position();
|
||||
int end = data.limit();
|
||||
for (int i = 1; i <= frames; ++i)
|
||||
else
|
||||
{
|
||||
int limit = begin + Math.min(maxFrameSize * i, length);
|
||||
data.limit(limit);
|
||||
int limit = data.limit();
|
||||
int newLimit = data.position() + length;
|
||||
data.limit(newLimit);
|
||||
ByteBuffer slice = data.slice();
|
||||
data.position(limit);
|
||||
generateFrame(lease, streamId, slice, i == frames && last && limit == end);
|
||||
data.position(newLimit);
|
||||
data.limit(limit);
|
||||
generateFrame(lease, streamId, slice, false);
|
||||
}
|
||||
data.limit(end);
|
||||
return length;
|
||||
}
|
||||
|
||||
private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last)
|
||||
|
@ -88,6 +79,7 @@ public class DataGenerator
|
|||
BufferUtil.flipToFlush(header, 0);
|
||||
lease.append(header, true);
|
||||
|
||||
lease.append(data, false);
|
||||
if (data.remaining() > 0)
|
||||
lease.append(data, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,8 +80,8 @@ public class Generator
|
|||
generators[frame.getType().getType()].generate(lease, frame);
|
||||
}
|
||||
|
||||
public void data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
|
||||
public int data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
|
||||
{
|
||||
dataGenerator.generate(lease, frame, maxLength);
|
||||
return dataGenerator.generate(lease, frame, maxLength);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -103,7 +103,14 @@ public class DataGenerateParseTest
|
|||
for (int i = 0; i < 2; ++i)
|
||||
{
|
||||
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
|
||||
generator.generateData(lease, 13, data.slice(), true, data.remaining());
|
||||
ByteBuffer slice = data.slice();
|
||||
int generated = 0;
|
||||
while (true)
|
||||
{
|
||||
generated += generator.generateData(lease, 13, slice, true, slice.remaining());
|
||||
if (generated == data.remaining())
|
||||
break;
|
||||
}
|
||||
|
||||
frames.clear();
|
||||
for (ByteBuffer buffer : lease.getByteBuffers())
|
||||
|
@ -135,7 +142,14 @@ public class DataGenerateParseTest
|
|||
{
|
||||
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
|
||||
ByteBuffer data = ByteBuffer.wrap(largeContent);
|
||||
generator.generateData(lease, 13, data.slice(), true, data.remaining());
|
||||
ByteBuffer slice = data.slice();
|
||||
int generated = 0;
|
||||
while (true)
|
||||
{
|
||||
generated += generator.generateData(lease, 13, slice, true, slice.remaining());
|
||||
if (generated == data.remaining())
|
||||
break;
|
||||
}
|
||||
|
||||
frames.clear();
|
||||
for (ByteBuffer buffer : lease.getByteBuffers())
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Locale;
|
||||
|
||||
import org.eclipse.jetty.client.HttpChannel;
|
||||
|
@ -34,6 +35,8 @@ import org.eclipse.jetty.http2.frames.DataFrame;
|
|||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
||||
public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listener
|
||||
|
@ -95,7 +98,42 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
|
|||
return;
|
||||
}
|
||||
|
||||
if (responseContent(exchange, frame.getData(), callback))
|
||||
// We must copy the data since we do not know when the
|
||||
// application will consume the bytes and the parsing
|
||||
// will continue as soon as this method returns, eventually
|
||||
// leading to reusing the underlying buffer for more reads.
|
||||
ByteBufferPool byteBufferPool = getHttpDestination().getHttpClient().getByteBufferPool();
|
||||
ByteBuffer original = frame.getData();
|
||||
int length = original.remaining();
|
||||
final ByteBuffer copy = byteBufferPool.acquire(length, original.isDirect());
|
||||
BufferUtil.clearToFill(copy);
|
||||
copy.put(original);
|
||||
BufferUtil.flipToFlush(copy, 0);
|
||||
|
||||
Callback delegate = new Callback()
|
||||
{
|
||||
@Override
|
||||
public boolean isNonBlocking()
|
||||
{
|
||||
return callback.isNonBlocking();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
byteBufferPool.release(copy);
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
byteBufferPool.release(copy);
|
||||
callback.failed(x);
|
||||
}
|
||||
};
|
||||
|
||||
if (responseContent(exchange, copy, delegate))
|
||||
{
|
||||
if (frame.isEndStream())
|
||||
responseSuccess(exchange);
|
||||
|
|
|
@ -164,15 +164,17 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
public Runnable requestContent(DataFrame frame, final Callback callback)
|
||||
{
|
||||
// We must copy the data since we do not know when the
|
||||
// application will consume its bytes (we queue them by
|
||||
// calling onContent()), and we cannot stop the parsing
|
||||
// since there may be frames for other streams.
|
||||
// application will consume the bytes (we queue them by
|
||||
// calling onContent()), and the parsing will continue
|
||||
// as soon as this method returns, eventually leading
|
||||
// to reusing the underlying buffer for more reads.
|
||||
final ByteBufferPool byteBufferPool = getByteBufferPool();
|
||||
ByteBuffer original = frame.getData();
|
||||
int length = original.remaining();
|
||||
final ByteBuffer copy = byteBufferPool.acquire(length, original.isDirect());
|
||||
BufferUtil.clearToFill(copy);
|
||||
copy.put(original).flip();
|
||||
copy.put(original);
|
||||
BufferUtil.flipToFlush(copy, 0);
|
||||
|
||||
boolean handle = onContent(new HttpInput.Content(copy)
|
||||
{
|
||||
|
|
|
@ -106,7 +106,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
public void succeeded()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("HTTP2 Response #{} committed", stream.getId());
|
||||
LOG.debug("HTTP2 Response #{}/{} committed", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
|
||||
send(content, lastContent, callback);
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
public void failed(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
|
||||
LOG.debug("HTTP2 Response #" + stream.getId() + "/" + Integer.toHexString(stream.getSession().hashCode()) + " failed to commit", x);
|
||||
callback.failed(x);
|
||||
}
|
||||
});
|
||||
|
@ -159,7 +159,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("HTTP/2 Push {}",request);
|
||||
LOG.debug("HTTP/2 Push {}", request);
|
||||
|
||||
stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
|
||||
{
|
||||
|
@ -182,8 +182,9 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("HTTP2 Response #{}:{}{} {}{}{}",
|
||||
stream.getId(), System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
|
||||
LOG.debug("HTTP2 Response #{}/{}:{}{} {}{}{}",
|
||||
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
||||
System.lineSeparator(), HttpVersion.HTTP_2, info.getStatus(),
|
||||
System.lineSeparator(), info.getFields());
|
||||
}
|
||||
|
||||
|
@ -195,8 +196,9 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("HTTP2 Response #{}: {} content bytes{}",
|
||||
stream.getId(), content.remaining(), lastContent ? " (last chunk)" : "");
|
||||
LOG.debug("HTTP2 Response #{}/{}: {} content bytes{}",
|
||||
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
|
||||
content.remaining(), lastContent ? " (last chunk)" : "");
|
||||
}
|
||||
DataFrame frame = new DataFrame(stream.getId(), content, lastContent);
|
||||
stream.data(frame, callback);
|
||||
|
@ -222,7 +224,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport
|
|||
{
|
||||
IStream stream = this.stream;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("HTTP2 Response #{} aborted", stream == null ? -1 : stream.getId());
|
||||
LOG.debug("HTTP2 Response #{}/{} aborted", stream == null ? -1 : stream.getId(),
|
||||
stream == null ? -1 : Integer.toHexString(stream.getSession().hashCode()));
|
||||
if (stream != null)
|
||||
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
|
||||
}
|
||||
|
|
|
@ -97,6 +97,9 @@ public class InfinispanSessionDataStore extends AbstractSessionDataStore
|
|||
try
|
||||
{
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Loading session {} from infinispan", id);
|
||||
|
||||
SessionData sd = (SessionData)_cache.get(getCacheKey(id, _context));
|
||||
reference.set(sd);
|
||||
}
|
||||
|
@ -201,9 +204,24 @@ public class InfinispanSessionDataStore extends AbstractSessionDataStore
|
|||
@Override
|
||||
public boolean isPassivating()
|
||||
{
|
||||
return true;
|
||||
//TODO run in the _context to ensure classloader is set
|
||||
try
|
||||
{
|
||||
Class<?> remoteClass = Thread.currentThread().getContextClassLoader().loadClass("org.infinispan.client.hotrod.RemoteCache");
|
||||
if (_cache.getClass().isAssignableFrom(remoteClass))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
catch (ClassNotFoundException e)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void setInfinispanIdleTimeoutSec (int sec)
|
||||
{
|
||||
_infinispanIdleTimeoutSec = sec;
|
||||
|
|
|
@ -293,24 +293,7 @@ public class InfinispanSessionIdManager extends AbstractSessionIdManager
|
|||
return delete (id);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Remove an id from use by telling all contexts to remove a session with this id.
|
||||
*
|
||||
* @see org.eclipse.jetty.server.SessionIdManager#expireAll(java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public void expireAll(String id)
|
||||
{
|
||||
LOG.debug("Expiring "+id);
|
||||
//take the id out of the list of known sessionids for this node - TODO consider if we didn't remove it from this node
|
||||
//it is because infinispan probably already timed it out. So, we only want to expire it from memory and NOT load it if present
|
||||
removeId(id);
|
||||
//tell all contexts that may have a session object with this id to
|
||||
//get rid of them
|
||||
for (SessionManager manager:getSessionManagers())
|
||||
{
|
||||
manager.invalidate(id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
*/
|
||||
protected int _desiredInterestOps;
|
||||
|
||||
|
||||
|
||||
private abstract class RunnableTask implements Runnable
|
||||
{
|
||||
final String _operation;
|
||||
|
@ -66,14 +66,14 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
{
|
||||
_operation=op;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return ChannelEndPoint.this.toString()+":"+_operation;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private abstract class RejectableRunnable extends RunnableTask implements Rejectable
|
||||
{
|
||||
RejectableRunnable(String op)
|
||||
|
@ -81,7 +81,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
super(op);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Override
|
||||
public void reject()
|
||||
{
|
||||
try
|
||||
|
@ -94,7 +94,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final Runnable _runUpdateKey = new RunnableTask("runUpdateKey")
|
||||
{
|
||||
@Override
|
||||
|
@ -127,8 +127,8 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
@Override
|
||||
public void run()
|
||||
{
|
||||
getFillInterest().fillable();
|
||||
getWriteFlusher().completeWrite();
|
||||
getFillInterest().fillable();
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -171,7 +171,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
super.doClose();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onClose()
|
||||
{
|
||||
|
@ -185,7 +185,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
_selector.onClose(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public int fill(ByteBuffer buffer) throws IOException
|
||||
|
@ -291,7 +291,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
/**
|
||||
* This method may run concurrently with {@link #changeInterests(int)}.
|
||||
*/
|
||||
|
||||
|
||||
int readyOps = _key.readyOps();
|
||||
int oldInterestOps;
|
||||
int newInterestOps;
|
||||
|
@ -303,15 +303,15 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
newInterestOps = oldInterestOps & ~readyOps;
|
||||
_desiredInterestOps = newInterestOps;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
|
||||
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
|
||||
|
||||
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this);
|
||||
|
||||
|
||||
// Run non-blocking code immediately.
|
||||
// This producer knows that this non-blocking code is special
|
||||
// and that it must be run in this thread and not fed to the
|
||||
|
@ -331,11 +331,11 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
_runCompleteWrite.run();
|
||||
writable = false;
|
||||
}
|
||||
|
||||
|
||||
// return task to complete the job
|
||||
Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable)
|
||||
: (writable ? _runCompleteWrite : null);
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("task {}",task);
|
||||
return task;
|
||||
|
@ -347,7 +347,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
/**
|
||||
* This method may run concurrently with {@link #changeInterests(int)}.
|
||||
*/
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
int oldInterestOps;
|
||||
|
@ -363,7 +363,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
_key.interestOps(newInterestOps);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
|
||||
}
|
||||
|
@ -385,7 +385,7 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
* This method may run concurrently with
|
||||
* {@link #updateKey()} and {@link #onSelected()}.
|
||||
*/
|
||||
|
||||
|
||||
int oldInterestOps;
|
||||
int newInterestOps;
|
||||
boolean pending;
|
||||
|
@ -397,14 +397,14 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
if (newInterestOps != oldInterestOps)
|
||||
_desiredInterestOps = newInterestOps;
|
||||
}
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
|
||||
|
||||
|
||||
if (!pending && _selector!=null)
|
||||
_selector.submit(_runUpdateKey);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
|
@ -427,5 +427,5 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
|
|||
return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _desiredInterestOps);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -61,16 +61,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
Interceptor getNextInterceptor();
|
||||
boolean isOptimizedForDirectBuffers();
|
||||
}
|
||||
|
||||
|
||||
private static Logger LOG = Log.getLogger(HttpOutput.class);
|
||||
|
||||
private final HttpChannel _channel;
|
||||
private final SharedBlockingCallback _writeBlock;
|
||||
private Interceptor _interceptor;
|
||||
|
||||
|
||||
/** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */
|
||||
private long _written;
|
||||
|
||||
|
||||
private ByteBuffer _aggregate;
|
||||
private int _bufferSize;
|
||||
private int _commitSize;
|
||||
|
@ -115,7 +115,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_commitSize=_bufferSize;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public HttpChannel getHttpChannel()
|
||||
{
|
||||
return _channel;
|
||||
|
@ -130,7 +130,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
_interceptor=filter;
|
||||
}
|
||||
|
||||
|
||||
public boolean isWritten()
|
||||
{
|
||||
return _written > 0;
|
||||
|
@ -155,11 +155,11 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
return _writeBlock.acquire();
|
||||
}
|
||||
|
||||
|
||||
private void write(ByteBuffer content, boolean complete) throws IOException
|
||||
{
|
||||
try (Blocker blocker = _writeBlock.acquire())
|
||||
{
|
||||
{
|
||||
write(content, complete, blocker);
|
||||
blocker.block();
|
||||
}
|
||||
|
@ -336,7 +336,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
|
||||
case CLOSED:
|
||||
return;
|
||||
|
||||
|
@ -400,7 +400,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
|
||||
case CLOSED:
|
||||
throw new EofException("Closed");
|
||||
|
||||
|
@ -502,7 +502,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
|
||||
case CLOSED:
|
||||
throw new EofException("Closed");
|
||||
|
||||
|
@ -584,7 +584,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
|
||||
case ERROR:
|
||||
throw new EofException(_onError);
|
||||
|
||||
|
||||
case CLOSED:
|
||||
throw new EofException("Closed");
|
||||
|
||||
|
@ -614,7 +614,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sendContent({})",BufferUtil.toDetailString(content));
|
||||
|
||||
|
||||
write(content, true);
|
||||
closed();
|
||||
}
|
||||
|
@ -694,7 +694,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback);
|
||||
|
||||
|
||||
write(content, true, new Callback()
|
||||
{
|
||||
@Override
|
||||
|
@ -724,7 +724,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sendContent(stream={},{})",in,callback);
|
||||
|
||||
|
||||
new InputStreamWritingCB(in, callback).iterate();
|
||||
}
|
||||
|
||||
|
@ -739,7 +739,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sendContent(channel={},{})",in,callback);
|
||||
|
||||
|
||||
new ReadableByteChannelWritingCB(in, callback).iterate();
|
||||
}
|
||||
|
||||
|
@ -753,7 +753,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sendContent(http={},{})",httpContent,callback);
|
||||
|
||||
|
||||
if (BufferUtil.hasContent(_aggregate))
|
||||
{
|
||||
callback.failed(new IOException("cannot sendContent() after write()"));
|
||||
|
@ -777,7 +777,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
case ERROR:
|
||||
callback.failed(new EofException(_onError));
|
||||
return;
|
||||
|
||||
|
||||
case CLOSED:
|
||||
callback.failed(new EofException("Closed"));
|
||||
return;
|
||||
|
@ -787,7 +787,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
|
||||
ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
|
||||
if (buffer == null)
|
||||
|
@ -841,7 +841,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
resetBuffer();
|
||||
_interceptor=_channel;
|
||||
}
|
||||
|
||||
|
||||
public void resetBuffer()
|
||||
{
|
||||
_written = 0;
|
||||
|
@ -897,7 +897,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
|
||||
case ERROR:
|
||||
return true;
|
||||
|
||||
|
||||
case CLOSED:
|
||||
return true;
|
||||
|
||||
|
@ -940,7 +940,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
switch(_state.get())
|
||||
{
|
||||
case CLOSED:
|
||||
|
@ -961,7 +961,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_onError = e;
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
default:
|
||||
_onError=new IllegalStateException("state="+_state.get());
|
||||
}
|
||||
|
@ -1084,7 +1084,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
{
|
||||
_slice=_buffer.duplicate();
|
||||
_buffer.position(_buffer.limit());
|
||||
}
|
||||
}
|
||||
_complete=complete;
|
||||
}
|
||||
|
||||
|
@ -1107,7 +1107,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
BufferUtil.flipToFlush(_aggregate, position);
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
|
||||
|
||||
// Is there data left to write?
|
||||
if (_buffer.hasRemaining())
|
||||
{
|
||||
|
@ -1118,7 +1118,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
write(_buffer, _complete, this);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
|
||||
// otherwise take a slice
|
||||
int p=_buffer.position();
|
||||
int l=Math.min(getBufferSize(),_buffer.remaining());
|
||||
|
@ -1130,7 +1130,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
write(_slice, _complete && _completed, this);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
|
||||
// all content written, but if we have not yet signal completion, we
|
||||
// need to do so
|
||||
if (_complete && !_completed)
|
||||
|
@ -1190,7 +1190,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_channel.getByteBufferPool().release(_buffer);
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
|
||||
|
||||
// Read until buffer full or EOF
|
||||
int len=0;
|
||||
while (len<_buffer.capacity() && !_eof)
|
||||
|
@ -1240,7 +1240,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_in=in;
|
||||
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
{
|
||||
|
@ -1255,16 +1255,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable
|
|||
_channel.getByteBufferPool().release(_buffer);
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
|
||||
|
||||
// Read from stream until buffer full or EOF
|
||||
_buffer.clear();
|
||||
BufferUtil.clearToFill(_buffer);
|
||||
while (_buffer.hasRemaining() && !_eof)
|
||||
_eof = (_in.read(_buffer)) < 0;
|
||||
|
||||
// write what we have
|
||||
_buffer.flip();
|
||||
BufferUtil.flipToFlush(_buffer, 0);
|
||||
write(_buffer,_eof,this);
|
||||
|
||||
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
|
|
|
@ -168,7 +168,7 @@ public class Request implements HttpServletRequest
|
|||
private String _pathInfo;
|
||||
|
||||
private boolean _secure;
|
||||
private boolean _asyncSupported = true;
|
||||
private String _asyncNotSupportedSource = null;
|
||||
private boolean _newContext;
|
||||
private boolean _cookiesExtracted = false;
|
||||
private boolean _handled = false;
|
||||
|
@ -1668,7 +1668,7 @@ public class Request implements HttpServletRequest
|
|||
@Override
|
||||
public boolean isAsyncSupported()
|
||||
{
|
||||
return _asyncSupported;
|
||||
return _asyncNotSupportedSource==null;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -1844,7 +1844,7 @@ public class Request implements HttpServletRequest
|
|||
if (_async!=null)
|
||||
_async.reset();
|
||||
_async=null;
|
||||
_asyncSupported = true;
|
||||
_asyncNotSupportedSource = null;
|
||||
_handled = false;
|
||||
if (_attributes != null)
|
||||
_attributes.clearAttributes();
|
||||
|
@ -1914,9 +1914,9 @@ public class Request implements HttpServletRequest
|
|||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void setAsyncSupported(boolean supported)
|
||||
public void setAsyncSupported(boolean supported,String source)
|
||||
{
|
||||
_asyncSupported = supported;
|
||||
_asyncNotSupportedSource = supported?null:(source==null?"unknown":source);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -2236,8 +2236,8 @@ public class Request implements HttpServletRequest
|
|||
@Override
|
||||
public AsyncContext startAsync() throws IllegalStateException
|
||||
{
|
||||
if (!_asyncSupported)
|
||||
throw new IllegalStateException("!asyncSupported");
|
||||
if (_asyncNotSupportedSource!=null)
|
||||
throw new IllegalStateException("!asyncSupported: "+_asyncNotSupportedSource);
|
||||
HttpChannelState state = getHttpChannelState();
|
||||
if (_async==null)
|
||||
_async=new AsyncContextState(state);
|
||||
|
@ -2250,8 +2250,8 @@ public class Request implements HttpServletRequest
|
|||
@Override
|
||||
public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) throws IllegalStateException
|
||||
{
|
||||
if (!_asyncSupported)
|
||||
throw new IllegalStateException("!asyncSupported");
|
||||
if (_asyncNotSupportedSource!=null)
|
||||
throw new IllegalStateException("!asyncSupported: "+_asyncNotSupportedSource);
|
||||
HttpChannelState state = getHttpChannelState();
|
||||
if (_async==null)
|
||||
_async=new AsyncContextState(state);
|
||||
|
|
|
@ -55,7 +55,7 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
|
|||
protected String _workerAttr;
|
||||
protected long _reseed=100000L;
|
||||
protected Server _server;
|
||||
protected SessionScavenger _scavenger;
|
||||
protected PeriodicSessionInspector _scavenger;
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
|
@ -102,7 +102,7 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
|
|||
/**
|
||||
* @param scavenger a SessionScavenger
|
||||
*/
|
||||
public void setSessionScavenger (SessionScavenger scavenger)
|
||||
public void setSessionScavenger (PeriodicSessionInspector scavenger)
|
||||
{
|
||||
_scavenger = scavenger;
|
||||
_scavenger.setSessionIdManager(this);
|
||||
|
@ -285,7 +285,7 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
|
|||
if (_scavenger == null)
|
||||
{
|
||||
LOG.warn("No SessionScavenger set, using defaults");
|
||||
_scavenger = new SessionScavenger();
|
||||
_scavenger = new PeriodicSessionInspector();
|
||||
_scavenger.setSessionIdManager(this);
|
||||
}
|
||||
|
||||
|
@ -388,17 +388,14 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
|
|||
//session data store, AND have listeners called on them.
|
||||
//BUT want to avoid loading into memory sessions that this node is not managing (eg have 3 nodes all running session mgrs,
|
||||
//all 3 find the expired session and load it into memory and expire it
|
||||
if (removeId(id))
|
||||
removeId(id);
|
||||
|
||||
//tell all contexts that may have a session object with this id to
|
||||
//get rid of them
|
||||
for (SessionManager manager:getSessionManagers())
|
||||
{
|
||||
//tell all contexts that may have a session object with this id to
|
||||
//get rid of them
|
||||
for (SessionManager manager:getSessionManagers())
|
||||
{
|
||||
manager.invalidate(id);
|
||||
}
|
||||
manager.invalidate(id);
|
||||
}
|
||||
else if (LOG.isDebugEnabled())
|
||||
LOG.debug("Not present in idmgr: {}", id);
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
|
@ -408,17 +405,16 @@ public abstract class AbstractSessionIdManager extends AbstractLifeCycle impleme
|
|||
public void invalidateAll (String id)
|
||||
{
|
||||
//take the id out of the list of known sessionids for this node
|
||||
if (removeId(id))
|
||||
{
|
||||
//tell all contexts that may have a session object with this id to
|
||||
//get rid of them
|
||||
for (SessionManager manager:getSessionManagers())
|
||||
{
|
||||
manager.invalidate(id);
|
||||
}
|
||||
}
|
||||
removeId(id);
|
||||
|
||||
//tell all contexts that may have a session object with this id to
|
||||
//get rid of them
|
||||
for (SessionManager manager:getSessionManagers())
|
||||
{
|
||||
manager.invalidate(id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Generate a new id for a session and update across
|
||||
|
|
|
@ -20,10 +20,14 @@
|
|||
package org.eclipse.jetty.server.session;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import org.eclipse.jetty.util.MultiException;
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
@ -43,8 +47,9 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
protected StalenessStrategy _staleStrategy;
|
||||
protected SessionManager _manager;
|
||||
protected SessionContext _context;
|
||||
|
||||
|
||||
protected int _idlePassivationTimeoutSec;
|
||||
private IdleInspector _idleInspector;
|
||||
private ExpiryInspector _expiryInspector;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -91,16 +96,7 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
* @return true if removed false otherwise
|
||||
*/
|
||||
public abstract Session doDelete (String id);
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Get a list of keys for sessions that the store thinks has expired
|
||||
* @return ids of all Session objects that might have expired
|
||||
*/
|
||||
public abstract Set<String> doGetExpiredCandidates();
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -128,7 +124,6 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
return _manager;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#initialize(org.eclipse.jetty.server.session.SessionContext)
|
||||
|
@ -158,6 +153,8 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
_sessionDataStore.initialize(_context);
|
||||
_sessionDataStore.start();
|
||||
|
||||
_expiryInspector = new ExpiryInspector(this, _manager.getSessionIdManager());
|
||||
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
|
@ -168,6 +165,7 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
protected void doStop() throws Exception
|
||||
{
|
||||
_sessionDataStore.stop();
|
||||
_expiryInspector = null;
|
||||
super.doStop();
|
||||
}
|
||||
|
||||
|
@ -204,6 +202,30 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#getIdlePassivationTimeoutSec()
|
||||
*/
|
||||
public int getIdlePassivationTimeoutSec()
|
||||
{
|
||||
return _idlePassivationTimeoutSec;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#setIdlePassivationTimeoutSec(int)
|
||||
*/
|
||||
public void setIdlePassivationTimeoutSec(int idleTimeoutSec)
|
||||
{
|
||||
_idlePassivationTimeoutSec = idleTimeoutSec;
|
||||
if (_idlePassivationTimeoutSec == 0)
|
||||
_idleInspector = null;
|
||||
else if (_idleInspector == null)
|
||||
_idleInspector = new IdleInspector(this);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Get a session object.
|
||||
*
|
||||
|
@ -219,28 +241,47 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
//look locally
|
||||
Session session = doGet(id);
|
||||
|
||||
|
||||
if (staleCheck && isStale(session))
|
||||
{
|
||||
//delete from session store so should reload from session data store
|
||||
doDelete(id);
|
||||
session = null;
|
||||
}
|
||||
|
||||
//not in session store, load the data for the session if possible
|
||||
if (session == null && _sessionDataStore != null)
|
||||
//TODO also check that session is only written out if only the access time changes infrequently
|
||||
|
||||
//session is either not in session store, or it is stale, or its been passivated, load the data for the session if possible
|
||||
if (session == null || (staleCheck && isStale(session)) || session.isPassivated() && _sessionDataStore != null)
|
||||
{
|
||||
SessionData data = _sessionDataStore.load(id);
|
||||
if (data != null)
|
||||
|
||||
//session wasn't in session store
|
||||
if (session == null)
|
||||
{
|
||||
session = newSession(data);
|
||||
session.setSessionManager(_manager);
|
||||
Session existing = doPutIfAbsent(id, session);
|
||||
if (existing != null)
|
||||
if (data != null)
|
||||
{
|
||||
//some other thread has got in first and added the session
|
||||
//so use it
|
||||
session = existing;
|
||||
session = newSession(data);
|
||||
session.setSessionManager(_manager);
|
||||
Session existing = doPutIfAbsent(id, session);
|
||||
if (existing != null)
|
||||
{
|
||||
//some other thread has got in first and added the session
|
||||
//so use it
|
||||
session = existing;
|
||||
}
|
||||
}
|
||||
//else session not in store and not in data store either, so doesn't exist
|
||||
}
|
||||
else
|
||||
{
|
||||
//session was already in session store, refresh it if its still stale/passivated
|
||||
try (Lock lock = session.lock())
|
||||
{
|
||||
if (session.isPassivated() || staleCheck && isStale(session))
|
||||
{
|
||||
//if we were able to load it, then update our session object
|
||||
if (data != null)
|
||||
{
|
||||
session.setPassivated(false);
|
||||
session.getSessionData().copy(data);
|
||||
session.didActivate();
|
||||
}
|
||||
else
|
||||
session = null; //TODO rely on the expiry mechanism to get rid of it?
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -303,11 +344,39 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
/**
|
||||
* Remove a session object from this store and from any backing store.
|
||||
*
|
||||
* If session has been passivated, may need to reload it before it can
|
||||
* be properly deleted
|
||||
*
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#delete(java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Session delete(String id) throws Exception
|
||||
{
|
||||
//Ensure that the session object is not passivated so that its attributes
|
||||
//are valid
|
||||
Session session = doGet(id);
|
||||
|
||||
//TODO if (session == null) do we want to load it to delete it?
|
||||
if (session != null)
|
||||
{
|
||||
try (Lock lock = session.lock())
|
||||
{
|
||||
//TODO don't check stale on deletion?
|
||||
if (session.isPassivated() && _sessionDataStore != null)
|
||||
{
|
||||
session.setPassivated(false);
|
||||
SessionData data = _sessionDataStore.load(id);
|
||||
if (data != null)
|
||||
{
|
||||
session.getSessionData().copy(data);
|
||||
session.didActivate();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//Always delete it from the data store
|
||||
if (_sessionDataStore != null)
|
||||
{
|
||||
boolean dsdel = _sessionDataStore.delete(id);
|
||||
|
@ -331,19 +400,93 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#getExpired()
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#checkExpiry(java.util.Set)
|
||||
*/
|
||||
@Override
|
||||
public Set<String> getExpired()
|
||||
public Set<String> checkExpiration(Set<String> candidates)
|
||||
{
|
||||
if (!isStarted())
|
||||
return Collections.emptySet();
|
||||
Set<String> candidates = doGetExpiredCandidates();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("SessionStore checking expiration on {}", candidates);
|
||||
return _sessionDataStore.getExpired(candidates);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#inspect()
|
||||
*/
|
||||
public void inspect ()
|
||||
{
|
||||
Stream<Session> stream = getStream();
|
||||
try
|
||||
{
|
||||
_expiryInspector.preInspection();
|
||||
if (_idleInspector != null)
|
||||
_idleInspector.preInspection();
|
||||
stream.forEach(s->{_expiryInspector.inspect(s); if (_idleInspector != null) _idleInspector.inspect(s);});
|
||||
_expiryInspector.postInspection();
|
||||
_idleInspector.postInspection();
|
||||
}
|
||||
finally
|
||||
{
|
||||
stream.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* If the SessionDataStore supports passivation, passivate any
|
||||
* sessions that have not be accessed for longer than x sec
|
||||
*
|
||||
* @param id identity of session to passivate
|
||||
*/
|
||||
public void passivateIdleSession(String id)
|
||||
{
|
||||
if (!isStarted())
|
||||
return;
|
||||
|
||||
if (_sessionDataStore == null)
|
||||
return; //no data store to passivate
|
||||
|
||||
if (!_sessionDataStore.isPassivating())
|
||||
return; //doesn't support passivation
|
||||
|
||||
|
||||
//get the session locally
|
||||
Session s = doGet(id);
|
||||
|
||||
if (s == null)
|
||||
{
|
||||
LOG.warn("Session {} not in this session store", s);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
try (Lock lock = s.lock())
|
||||
{
|
||||
//check the session is still idle first
|
||||
if (s.isValid() && s.isIdleLongerThan(_idlePassivationTimeoutSec))
|
||||
{
|
||||
s.willPassivate();
|
||||
_sessionDataStore.store(id, s.getSessionData());
|
||||
s.getSessionData().clearAllAttributes();
|
||||
s.getSessionData().setDirty(false);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOG.warn("Passivation of idle session {} failed", id, e);
|
||||
// TODO should do session.invalidate(); ???
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
@ -355,5 +498,4 @@ public abstract class AbstractSessionStore extends AbstractLifeCycle implements
|
|||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
|
||||
package org.eclipse.jetty.server.session;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.eclipse.jetty.server.SessionIdManager;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* ExpiryInspector
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class ExpiryInspector implements SessionInspector
|
||||
{
|
||||
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
|
||||
|
||||
protected Set<String> _expiryCandidates;
|
||||
protected SessionIdManager _idManager;
|
||||
protected AbstractSessionStore _sessionStore;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @param sessionStore
|
||||
* @param idManager
|
||||
*/
|
||||
public ExpiryInspector (AbstractSessionStore sessionStore, SessionIdManager idManager)
|
||||
{
|
||||
_idManager = idManager;
|
||||
_sessionStore = sessionStore;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionInspector#inspect(org.eclipse.jetty.server.session.SessionStore, org.eclipse.jetty.server.session.Session)
|
||||
*/
|
||||
@Override
|
||||
public void inspect(Session s)
|
||||
{
|
||||
//Does the session object think it is expired?
|
||||
long now = System.currentTimeMillis();
|
||||
if (s.isExpiredAt(now))
|
||||
_expiryCandidates.add(s.getId());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @return the expiryCandidates
|
||||
*/
|
||||
public Set<String> getExpiryCandidates()
|
||||
{
|
||||
return _expiryCandidates;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionInspector#preInspection()
|
||||
*/
|
||||
@Override
|
||||
public void preInspection()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("PreInspection");
|
||||
_expiryCandidates = new HashSet<String>();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionInspector#postInspection()
|
||||
*/
|
||||
@Override
|
||||
public void postInspection()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ExpiryInspector checking expiration for {}", _expiryCandidates);
|
||||
|
||||
try
|
||||
{
|
||||
Set<String> candidates = _sessionStore.checkExpiration(_expiryCandidates);
|
||||
for (String id:candidates)
|
||||
{
|
||||
_idManager.expireAll(id);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_expiryCandidates = null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -117,6 +117,8 @@ public class FileSessionDataStore extends AbstractSessionDataStore
|
|||
public Set<String> getExpired(Set<String> candidates)
|
||||
{
|
||||
//we don't want to open up each file and check, so just leave it up to the SessionStore
|
||||
//TODO as the session manager is likely to be a lazy loader, if a session is never requested, its
|
||||
//file will stay forever after a restart
|
||||
return candidates;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
|
||||
package org.eclipse.jetty.server.session;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
|
||||
/**
|
||||
* IdleExpiryInspector
|
||||
*
|
||||
* Checks if a session is idle
|
||||
*/
|
||||
public class IdleInspector implements SessionInspector
|
||||
{
|
||||
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
|
||||
|
||||
protected Set<String> _idleCandidates;
|
||||
protected AbstractSessionStore _sessionStore;
|
||||
|
||||
/**
|
||||
* @param sessionStore
|
||||
*/
|
||||
public IdleInspector (AbstractSessionStore sessionStore)
|
||||
{
|
||||
_sessionStore = sessionStore;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionInspector#inspect(org.eclipse.jetty.server.session.Session)
|
||||
*/
|
||||
@Override
|
||||
public void inspect(Session s)
|
||||
{
|
||||
//Does the session object think it is expired?
|
||||
long now = System.currentTimeMillis();
|
||||
if (s.isExpiredAt(now))
|
||||
return;
|
||||
|
||||
if (s.isValid() && s.isIdleLongerThan(_sessionStore.getIdlePassivationTimeoutSec()))
|
||||
{
|
||||
_idleCandidates.add(s.getId());
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the idleCandidates
|
||||
*/
|
||||
public Set<String> getIdleCandidates()
|
||||
{
|
||||
return _idleCandidates;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionInspector#preInspection()
|
||||
*/
|
||||
@Override
|
||||
public void preInspection()
|
||||
{
|
||||
_idleCandidates = new HashSet<String>();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionInspector#postInspection()
|
||||
*/
|
||||
@Override
|
||||
public void postInspection()
|
||||
{
|
||||
for (String id:_idleCandidates)
|
||||
{
|
||||
try
|
||||
{
|
||||
_sessionStore.passivateIdleSession(id);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
_idleCandidates = null;
|
||||
}
|
||||
}
|
|
@ -50,7 +50,7 @@ public class JDBCSessionIdManager extends org.eclipse.jetty.server.session.Abstr
|
|||
|
||||
protected final HashSet<String> _sessionIds = new HashSet<String>();
|
||||
protected Server _server;
|
||||
protected SessionScavenger _scavenger;
|
||||
protected PeriodicSessionInspector _scavenger;
|
||||
|
||||
private DatabaseAdaptor _dbAdaptor = new DatabaseAdaptor();
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.eclipse.jetty.server.session;
|
|||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
|
@ -150,7 +151,7 @@ public class MemorySessionStore extends AbstractSessionStore
|
|||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
|
||||
@Override
|
||||
public Set<String> doGetExpiredCandidates()
|
||||
|
@ -167,7 +168,7 @@ public class MemorySessionStore extends AbstractSessionStore
|
|||
}
|
||||
return candidates;
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
|
||||
|
||||
|
@ -242,4 +243,14 @@ public class MemorySessionStore extends AbstractSessionStore
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#getStream()
|
||||
*/
|
||||
@Override
|
||||
public Stream<Session> getStream()
|
||||
{
|
||||
return _sessions.values().stream();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -35,25 +35,26 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
* There is 1 session scavenger per SessionIdManager/Server instance.
|
||||
*
|
||||
*/
|
||||
public class SessionScavenger extends AbstractLifeCycle
|
||||
public class PeriodicSessionInspector extends AbstractLifeCycle
|
||||
{
|
||||
private final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
|
||||
|
||||
public static final long DEFAULT_SCAVENGE_MS = 1000L * 60 * 10;
|
||||
public static final long DEFAULT_PERIOD_MS = 1000L * 60 * 10;
|
||||
protected SessionIdManager _sessionIdManager;
|
||||
protected Scheduler _scheduler;
|
||||
protected Scheduler.Task _task; //scavenge task
|
||||
protected ScavengerRunner _runner;
|
||||
protected Runner _runner;
|
||||
protected boolean _ownScheduler = false;
|
||||
private long _scavengeIntervalMs = DEFAULT_SCAVENGE_MS;
|
||||
|
||||
private long _intervalMs = DEFAULT_PERIOD_MS;
|
||||
private long _lastTime = 0L;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* ScavengerRunner
|
||||
* Runner
|
||||
*
|
||||
*/
|
||||
protected class ScavengerRunner implements Runnable
|
||||
protected class Runner implements Runnable
|
||||
{
|
||||
|
||||
@Override
|
||||
|
@ -61,12 +62,12 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
{
|
||||
try
|
||||
{
|
||||
scavenge();
|
||||
inspect();
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (_scheduler != null && _scheduler.isRunning())
|
||||
_task = _scheduler.schedule(this, _scavengeIntervalMs, TimeUnit.MILLISECONDS);
|
||||
_task = _scheduler.schedule(this, _intervalMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -96,6 +97,7 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
if (!(_sessionIdManager instanceof AbstractSessionIdManager))
|
||||
throw new IllegalStateException ("SessionIdManager is not an AbstractSessionIdManager");
|
||||
|
||||
_lastTime = System.currentTimeMillis(); //set it to a non zero value
|
||||
|
||||
//try and use a common scheduler, fallback to own
|
||||
_scheduler = ((AbstractSessionIdManager)_sessionIdManager).getServer().getBean(Scheduler.class);
|
||||
|
@ -109,7 +111,7 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
else if (!_scheduler.isStarted())
|
||||
throw new IllegalStateException("Shared scheduler not started");
|
||||
|
||||
setScavengeIntervalSec(getScavengeIntervalSec());
|
||||
setIntervalSec(getIntervalSec());
|
||||
|
||||
super.doStart();
|
||||
}
|
||||
|
@ -138,24 +140,24 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
* Set the period between scavenge cycles
|
||||
* @param sec
|
||||
*/
|
||||
public void setScavengeIntervalSec (long sec)
|
||||
public void setIntervalSec (long sec)
|
||||
{
|
||||
if (sec<=0)
|
||||
sec=60;
|
||||
|
||||
long old_period=_scavengeIntervalMs;
|
||||
long old_period=_intervalMs;
|
||||
long period=sec*1000L;
|
||||
|
||||
_scavengeIntervalMs=period;
|
||||
_intervalMs=period;
|
||||
|
||||
//add a bit of variability into the scavenge time so that not all
|
||||
//nodes with the same scavenge interval sync up
|
||||
long tenPercent = _scavengeIntervalMs/10;
|
||||
long tenPercent = _intervalMs/10;
|
||||
if ((System.currentTimeMillis()%2) == 0)
|
||||
_scavengeIntervalMs += tenPercent;
|
||||
_intervalMs += tenPercent;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms");
|
||||
LOG.debug("Inspecting every "+_intervalMs+" ms");
|
||||
|
||||
synchronized (this)
|
||||
{
|
||||
|
@ -164,8 +166,8 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
if (_task!=null)
|
||||
_task.cancel();
|
||||
if (_runner == null)
|
||||
_runner = new ScavengerRunner();
|
||||
_task = _scheduler.schedule(_runner,_scavengeIntervalMs,TimeUnit.MILLISECONDS);
|
||||
_runner = new Runner();
|
||||
_task = _scheduler.schedule(_runner,_intervalMs,TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -173,13 +175,13 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
|
||||
|
||||
/**
|
||||
* Get the period between scavenge cycles.
|
||||
* Get the period between inspection cycles.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public long getScavengeIntervalSec ()
|
||||
public long getIntervalSec ()
|
||||
{
|
||||
return _scavengeIntervalMs/1000;
|
||||
return _intervalMs/1000;
|
||||
}
|
||||
|
||||
|
||||
|
@ -189,26 +191,33 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
* ask all SessionManagers to find sessions they think have expired and then make
|
||||
* sure that a session sharing the same id is expired on all contexts
|
||||
*/
|
||||
public void scavenge ()
|
||||
public void inspect ()
|
||||
{
|
||||
//don't attempt to scavenge if we are shutting down
|
||||
if (isStopping() || isStopped())
|
||||
return;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Scavenging sessions");
|
||||
LOG.debug("Inspecting sessions");
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
//find the session managers
|
||||
for (SessionManager manager:((AbstractSessionIdManager)_sessionIdManager).getSessionManagers())
|
||||
{
|
||||
if (manager != null)
|
||||
{
|
||||
manager.inspect();
|
||||
|
||||
/*
|
||||
//call scavenge on each manager to find keys for sessions that have expired
|
||||
Set<String> expiredKeys = manager.scavenge();
|
||||
|
||||
//for each expired session, tell the session id manager to invalidate its key on all contexts
|
||||
for (String key:expiredKeys)
|
||||
{
|
||||
|
||||
// if it recently expired
|
||||
try
|
||||
{
|
||||
((AbstractSessionIdManager)_sessionIdManager).expireAll(key);
|
||||
|
@ -217,7 +226,7 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
{
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -229,7 +238,7 @@ public class SessionScavenger extends AbstractLifeCycle
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return super.toString()+"[interval="+_scavengeIntervalMs+", ownscheduler="+_ownScheduler+"]";
|
||||
return super.toString()+"[interval="+_intervalMs+", ownscheduler="+_ownScheduler+"]";
|
||||
}
|
||||
|
||||
}
|
|
@ -65,6 +65,9 @@ public class Session implements SessionManager.SessionIf
|
|||
private boolean _newSession;
|
||||
private State _state = State.VALID; //state of the session:valid,invalid or being invalidated
|
||||
private Locker _lock = new Locker();
|
||||
|
||||
|
||||
private boolean _isPassivated;
|
||||
|
||||
public Session (HttpServletRequest request, SessionData data)
|
||||
{
|
||||
|
@ -147,6 +150,17 @@ public class Session implements SessionManager.SessionIf
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
protected boolean isIdleLongerThan (int sec)
|
||||
{
|
||||
long now = System.currentTimeMillis();
|
||||
try (Lock lock = _lock.lockIfNotHeld())
|
||||
{
|
||||
return ((_sessionData.getAccessed() + (sec*1000)) < now);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param nodename
|
||||
|
@ -520,7 +534,7 @@ public class Session implements SessionManager.SessionIf
|
|||
public void setAttribute(String name, Object value)
|
||||
{
|
||||
Object old=null;
|
||||
try (Lock lock = lock())
|
||||
try (Lock lock = _lock.lockIfNotHeld())
|
||||
{
|
||||
//if session is not valid, don't accept the set
|
||||
checkValidForWrite();
|
||||
|
@ -578,7 +592,7 @@ public class Session implements SessionManager.SessionIf
|
|||
|
||||
String id = null;
|
||||
String extendedId = null;
|
||||
try (Lock lock = lock())
|
||||
try (Lock lock = _lock.lockIfNotHeld())
|
||||
{
|
||||
checkValidForWrite(); //don't renew id on a session that is not valid
|
||||
id = _sessionData.getId(); //grab the values as they are now
|
||||
|
@ -601,7 +615,7 @@ public class Session implements SessionManager.SessionIf
|
|||
*/
|
||||
public void renewId (String oldId, String oldExtendedId, String newId, String newExtendedId)
|
||||
{
|
||||
try (Lock lock = lock())
|
||||
try (Lock lock = _lock.lockIfNotHeld())
|
||||
{
|
||||
checkValidForWrite(); //can't change id on invalid session
|
||||
|
||||
|
@ -632,7 +646,7 @@ public class Session implements SessionManager.SessionIf
|
|||
|
||||
boolean result = false;
|
||||
|
||||
try (Lock lock = lock())
|
||||
try (Lock lock = _lock.lockIfNotHeld())
|
||||
{
|
||||
switch (_state)
|
||||
{
|
||||
|
@ -669,7 +683,7 @@ public class Session implements SessionManager.SessionIf
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------- */
|
||||
/** Grab the lock on the session
|
||||
|
@ -687,7 +701,7 @@ public class Session implements SessionManager.SessionIf
|
|||
/* ------------------------------------------------------------- */
|
||||
protected void doInvalidate() throws IllegalStateException
|
||||
{
|
||||
try (Lock lock = lock())
|
||||
try (Lock lock = _lock.lockIfNotHeld())
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -769,4 +783,26 @@ public class Session implements SessionManager.SessionIf
|
|||
{
|
||||
return _sessionData;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------- */
|
||||
/**
|
||||
* @return
|
||||
*/
|
||||
public boolean isPassivated()
|
||||
{
|
||||
return _isPassivated;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------- */
|
||||
/**
|
||||
* @param isPassivated
|
||||
*/
|
||||
public void setPassivated(boolean isPassivated)
|
||||
{
|
||||
_isPassivated = isPassivated;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -83,6 +83,36 @@ public class SessionData implements Serializable
|
|||
_maxInactiveMs = maxInactiveMs;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Copy the info from the given sessiondata
|
||||
*
|
||||
* @param data the sessiondata to be copied
|
||||
*/
|
||||
public void copy (SessionData data)
|
||||
{
|
||||
if (data == null)
|
||||
return; //don't copy if no data
|
||||
|
||||
if (data.getId() == null || !(getId().equals(data.getId())))
|
||||
throw new IllegalStateException ("Can only copy data for same session id");
|
||||
|
||||
if (data == this)
|
||||
return; //don't copy ourself
|
||||
|
||||
setLastNode(data.getLastNode());
|
||||
setContextPath(data.getContextPath());
|
||||
setVhost(data.getVhost());
|
||||
setCookieSet(data.getCookieSet());
|
||||
setCreated(data.getCreated());
|
||||
setAccessed(data.getAccessed());
|
||||
setLastAccessed(data.getLastAccessed());
|
||||
setMaxInactiveMs(data.getMaxInactiveMs());
|
||||
setExpiry(data.getExpiry());
|
||||
setLastSaved(data.getLastSaved());
|
||||
clearAllAttributes();
|
||||
putAllAttributes(data.getAllAttributes());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return time at which session was last written out
|
||||
|
@ -170,6 +200,14 @@ public class SessionData implements Serializable
|
|||
_attributes.putAll(attributes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all attributes
|
||||
*/
|
||||
public void clearAllAttributes ()
|
||||
{
|
||||
_attributes.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return an unmodifiable map of the attributes
|
||||
*/
|
||||
|
@ -322,12 +360,6 @@ public class SessionData implements Serializable
|
|||
_lastAccessed = lastAccessed;
|
||||
}
|
||||
|
||||
/* public boolean isInvalid()
|
||||
{
|
||||
return _invalid;
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
/**
|
||||
* @return
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
|
||||
package org.eclipse.jetty.server.session;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* SessionInspector
|
||||
*
|
||||
*
|
||||
*/
|
||||
public interface SessionInspector
|
||||
{
|
||||
public void preInspection();
|
||||
public void inspect(Session s);
|
||||
public void postInspection (); //on completion
|
||||
}
|
|
@ -1017,13 +1017,14 @@ public class SessionManager extends ContainerLifeCycle implements org.eclipse.je
|
|||
/**
|
||||
* @return
|
||||
*/
|
||||
public Set<String> scavenge ()
|
||||
public void inspect ()
|
||||
{
|
||||
//don't attempt to scavenge if we are shutting down
|
||||
if (isStopping() || isStopped())
|
||||
return Collections.emptySet();
|
||||
return;
|
||||
|
||||
return _sessionStore.getExpired();
|
||||
_sessionStore.inspect();
|
||||
//return _sessionStore.getExpired();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.eclipse.jetty.server.session;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
|
@ -43,5 +45,9 @@ public interface SessionStore extends LifeCycle
|
|||
boolean exists (String id) throws Exception;
|
||||
Session delete (String id) throws Exception;
|
||||
void shutdown ();
|
||||
Set<String> getExpired ();
|
||||
Set<String> checkExpiration (Set<String> candidates);
|
||||
void inspect();
|
||||
void setIdlePassivationTimeoutSec(int sec);
|
||||
int getIdlePassivationTimeoutSec();
|
||||
Stream<Session> getStream();
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.util.Enumeration;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.servlet.SessionCookieConfig;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -113,14 +114,16 @@ public class SessionCookieTest
|
|||
}
|
||||
|
||||
/**
|
||||
* @see org.eclipse.jetty.server.session.AbstractSessionStore#doGetExpiredCandidates()
|
||||
* @see org.eclipse.jetty.server.session.SessionStore#getStream()
|
||||
*/
|
||||
@Override
|
||||
public Set<String> doGetExpiredCandidates()
|
||||
public Stream<Session> getStream()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1521,17 +1521,21 @@ public class ServletHandler extends ScopedHandler
|
|||
//if the request already does not support async, then the setting for the filter
|
||||
//is irrelevant. However if the request supports async but this filter does not
|
||||
//temporarily turn it off for the execution of the filter
|
||||
boolean requestAsyncSupported = baseRequest.isAsyncSupported();
|
||||
try
|
||||
{
|
||||
if (!_filterHolder.isAsyncSupported() && requestAsyncSupported)
|
||||
baseRequest.setAsyncSupported(false);
|
||||
if (baseRequest.isAsyncSupported() && !_filterHolder.isAsyncSupported())
|
||||
{
|
||||
try
|
||||
{
|
||||
baseRequest.setAsyncSupported(false,_filterHolder.toString());
|
||||
filter.doFilter(request, response, _next);
|
||||
}
|
||||
finally
|
||||
{
|
||||
baseRequest.setAsyncSupported(true,null);
|
||||
}
|
||||
}
|
||||
else
|
||||
filter.doFilter(request, response, _next);
|
||||
}
|
||||
finally
|
||||
{
|
||||
baseRequest.setAsyncSupported(requestAsyncSupported);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1594,17 +1598,21 @@ public class ServletHandler extends ScopedHandler
|
|||
//if the request already does not support async, then the setting for the filter
|
||||
//is irrelevant. However if the request supports async but this filter does not
|
||||
//temporarily turn it off for the execution of the filter
|
||||
boolean requestAsyncSupported = _baseRequest.isAsyncSupported();
|
||||
try
|
||||
if (!holder.isAsyncSupported() && _baseRequest.isAsyncSupported())
|
||||
{
|
||||
if (!holder.isAsyncSupported() && requestAsyncSupported)
|
||||
_baseRequest.setAsyncSupported(false);
|
||||
try
|
||||
{
|
||||
_baseRequest.setAsyncSupported(false,holder.toString());
|
||||
filter.doFilter(request, response, this);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_baseRequest.setAsyncSupported(true,null);
|
||||
}
|
||||
}
|
||||
else
|
||||
filter.doFilter(request, response, this);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_baseRequest.setAsyncSupported(requestAsyncSupported);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -822,10 +822,20 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
|
|||
if (_identityService!=null)
|
||||
old_run_as=_identityService.setRunAs(baseRequest.getResolvedUserIdentity(),_runAsToken);
|
||||
|
||||
if (!isAsyncSupported())
|
||||
baseRequest.setAsyncSupported(false);
|
||||
|
||||
servlet.service(request,response);
|
||||
if (baseRequest.isAsyncSupported() && !isAsyncSupported())
|
||||
{
|
||||
try
|
||||
{
|
||||
baseRequest.setAsyncSupported(false,this.toString());
|
||||
servlet.service(request,response);
|
||||
}
|
||||
finally
|
||||
{
|
||||
baseRequest.setAsyncSupported(true,null);
|
||||
}
|
||||
}
|
||||
else
|
||||
servlet.service(request,response);
|
||||
}
|
||||
catch(UnavailableException e)
|
||||
{
|
||||
|
@ -834,8 +844,6 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
|
|||
}
|
||||
finally
|
||||
{
|
||||
baseRequest.setAsyncSupported(suspendable);
|
||||
|
||||
// Pop run-as role.
|
||||
if (_identityService!=null)
|
||||
_identityService.unsetRunAs(old_run_as);
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.eclipse.jetty.toolchain.test.AdvancedRunner;
|
|||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.URIUtil;
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.log.StdErrLog;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -63,6 +64,7 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import static org.eclipse.jetty.util.log.Log.getLogger;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -125,6 +127,9 @@ public class AsyncServletTest
|
|||
_servletHandler.addServletWithMapping(holder,"/path2/*");
|
||||
_servletHandler.addServletWithMapping(holder,"/p th3/*");
|
||||
_servletHandler.addServletWithMapping(new ServletHolder(new FwdServlet()),"/fwd/*");
|
||||
ServletHolder holder2=new ServletHolder("NoAsync",_servlet);
|
||||
holder2.setAsyncSupported(false);
|
||||
_servletHandler.addServletWithMapping(holder2,"/noasync/*");
|
||||
_server.start();
|
||||
_port=_connector.getLocalPort();
|
||||
__history.clear();
|
||||
|
@ -176,7 +181,45 @@ public class AsyncServletTest
|
|||
|
||||
assertContains("NORMAL",response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncNotSupportedNoAsync() throws Exception
|
||||
{
|
||||
_expectedCode="200 ";
|
||||
String response=process("noasync","",null);
|
||||
Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 200 OK"));
|
||||
assertThat(__history,contains(
|
||||
"REQUEST /ctx/noasync/info",
|
||||
"initial"
|
||||
));
|
||||
|
||||
assertContains("NORMAL",response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncNotSupportedAsync() throws Exception
|
||||
{
|
||||
((StdErrLog)getLogger(ServletHandler.class)).setHideStacks(true);
|
||||
try
|
||||
{
|
||||
_expectedCode="500 ";
|
||||
String response=process("noasync","start=200",null);
|
||||
Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 500 "));
|
||||
assertThat(__history,contains(
|
||||
"REQUEST /ctx/noasync/info",
|
||||
"initial"
|
||||
));
|
||||
|
||||
assertContains("HTTP ERROR: 500",response);
|
||||
assertContains("!asyncSupported",response);
|
||||
assertContains("AsyncServletTest$AsyncServlet",response);
|
||||
}
|
||||
finally
|
||||
{
|
||||
((StdErrLog)getLogger(ServletHandler.class)).setHideStacks(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStart() throws Exception
|
||||
{
|
||||
|
|
|
@ -19,22 +19,13 @@
|
|||
package org.eclipse.jetty.util;
|
||||
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileChannel.MapMode;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.OpenOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
@ -44,6 +35,12 @@ import org.junit.Assert;
|
|||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class BufferUtilTest
|
||||
{
|
||||
@Test
|
||||
|
@ -159,7 +156,7 @@ public class BufferUtilTest
|
|||
assertEquals(2,from.remaining());
|
||||
assertEquals("1234567890",BufferUtil.toString(to));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
|
@ -172,7 +169,7 @@ public class BufferUtilTest
|
|||
assertEquals("123",BufferUtil.toString(to));
|
||||
BufferUtil.append(to,from.array(),3,2);
|
||||
assertEquals("12345",BufferUtil.toString(to));
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
BufferUtil.append(to,from.array(),0,5);
|
||||
|
@ -181,7 +178,7 @@ public class BufferUtilTest
|
|||
catch(BufferOverflowException e)
|
||||
{}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testPutDirect() throws Exception
|
||||
|
@ -296,7 +293,7 @@ public class BufferUtilTest
|
|||
int capacity = BufferUtil.TEMP_BUFFER_SIZE*2+1024;
|
||||
testWriteToWithBufferThatDoesNotExposeArray(capacity);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testEnsureCapacity() throws Exception
|
||||
|
@ -305,13 +302,13 @@ public class BufferUtilTest
|
|||
assertTrue(b==BufferUtil.ensureCapacity(b, 0));
|
||||
assertTrue(b==BufferUtil.ensureCapacity(b, 10));
|
||||
assertTrue(b==BufferUtil.ensureCapacity(b, b.capacity()));
|
||||
|
||||
|
||||
|
||||
ByteBuffer b1 = BufferUtil.ensureCapacity(b, 64);
|
||||
assertTrue(b!=b1);
|
||||
assertEquals(64, b1.capacity());
|
||||
assertEquals("Goodbye Cruel World", BufferUtil.toString(b1));
|
||||
|
||||
|
||||
b1.position(8);
|
||||
b1.limit(13);
|
||||
assertEquals("Cruel", BufferUtil.toString(b1));
|
||||
|
@ -328,9 +325,9 @@ public class BufferUtilTest
|
|||
assertEquals(64, b3.capacity());
|
||||
assertEquals("Cruel", BufferUtil.toString(b3));
|
||||
assertEquals(0, b3.arrayOffset());
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void testWriteToWithBufferThatDoesNotExposeArray(int capacity) throws IOException
|
||||
{
|
||||
|
@ -342,7 +339,7 @@ public class BufferUtilTest
|
|||
BufferUtil.writeTo(buffer.asReadOnlyBuffer(), out);
|
||||
assertThat("Bytes in out equal bytes in buffer", Arrays.equals(bytes, out.toByteArray()), is(true));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMappedFile() throws Exception
|
||||
{
|
||||
|
@ -353,26 +350,26 @@ public class BufferUtilTest
|
|||
{
|
||||
out.write(data);
|
||||
}
|
||||
|
||||
|
||||
ByteBuffer mapped = BufferUtil.toMappedBuffer(file);
|
||||
assertEquals(data,BufferUtil.toString(mapped));
|
||||
assertTrue(BufferUtil.isMappedBuffer(mapped));
|
||||
|
||||
|
||||
ByteBuffer direct = BufferUtil.allocateDirect(data.length());
|
||||
direct.clear();
|
||||
BufferUtil.clearToFill(direct);
|
||||
direct.put(data.getBytes(StandardCharsets.ISO_8859_1));
|
||||
direct.flip();
|
||||
BufferUtil.flipToFlush(direct, 0);
|
||||
assertEquals(data,BufferUtil.toString(direct));
|
||||
assertFalse(BufferUtil.isMappedBuffer(direct));
|
||||
|
||||
|
||||
ByteBuffer slice = direct.slice();
|
||||
assertEquals(data,BufferUtil.toString(slice));
|
||||
assertFalse(BufferUtil.isMappedBuffer(slice));
|
||||
|
||||
|
||||
ByteBuffer duplicate = direct.duplicate();
|
||||
assertEquals(data,BufferUtil.toString(duplicate));
|
||||
assertFalse(BufferUtil.isMappedBuffer(duplicate));
|
||||
|
||||
|
||||
ByteBuffer readonly = direct.asReadOnlyBuffer();
|
||||
assertEquals(data,BufferUtil.toString(readonly));
|
||||
assertFalse(BufferUtil.isMappedBuffer(readonly));
|
||||
|
|
|
@ -71,6 +71,7 @@ import static org.junit.Assert.assertThat;
|
|||
public class HttpClientLoadTest extends AbstractTest
|
||||
{
|
||||
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
|
||||
private final AtomicLong requestCount = new AtomicLong();
|
||||
private final AtomicLong connectionLeaks = new AtomicLong();
|
||||
|
||||
public HttpClientLoadTest(Transport transport)
|
||||
|
@ -236,7 +237,7 @@ public class HttpClientLoadTest extends AbstractTest
|
|||
logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000 / elapsed : -1);
|
||||
|
||||
for (String failure : failures)
|
||||
System.err.println("FAILED: "+failure);
|
||||
logger.info("FAILED: {}", failure);
|
||||
|
||||
Assert.assertTrue(failures.toString(), failures.isEmpty());
|
||||
}
|
||||
|
@ -267,8 +268,10 @@ public class HttpClientLoadTest extends AbstractTest
|
|||
|
||||
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
|
||||
{
|
||||
long requestId = requestCount.incrementAndGet();
|
||||
Request request = client.newRequest(host, connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.path("/" + requestId)
|
||||
.method(method);
|
||||
|
||||
if (clientClose)
|
||||
|
@ -326,7 +329,8 @@ public class HttpClientLoadTest extends AbstractTest
|
|||
latch.countDown();
|
||||
}
|
||||
});
|
||||
await(requestLatch, 5, TimeUnit.SECONDS);
|
||||
if (!await(requestLatch, 5, TimeUnit.SECONDS))
|
||||
logger.warn("Request {} took too long", requestId);
|
||||
}
|
||||
|
||||
private boolean await(CountDownLatch latch, long time, TimeUnit unit)
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.eclipse.jetty.server.session;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
@ -76,7 +77,7 @@ public abstract class AbstractImmortalSessionTest
|
|||
|
||||
// Let's wait for the scavenger to run, waiting 2.5 times the scavenger period
|
||||
Thread.sleep(scavengePeriod * 2500L);
|
||||
|
||||
|
||||
// Be sure the session is still there
|
||||
Request request = client.newRequest("http://localhost:" + port + contextPath + servletMapping + "?action=get");
|
||||
request.header("Cookie", sessionCookie);
|
||||
|
@ -114,7 +115,8 @@ public abstract class AbstractImmortalSessionTest
|
|||
}
|
||||
else if ("get".equals(action))
|
||||
{
|
||||
HttpSession session = request.getSession(false);
|
||||
HttpSession session = request.getSession(false);
|
||||
assertNotNull(session);
|
||||
if (session!=null)
|
||||
result = (String)session.getAttribute("value");
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ public abstract class AbstractTestServer
|
|||
protected final int _scavengePeriod;
|
||||
protected final ContextHandlerCollection _contexts;
|
||||
protected SessionIdManager _sessionIdManager;
|
||||
private SessionScavenger _scavenger;
|
||||
private PeriodicSessionInspector _scavenger;
|
||||
|
||||
|
||||
|
||||
|
@ -83,8 +83,8 @@ public abstract class AbstractTestServer
|
|||
_sessionIdManager = newSessionIdManager(sessionIdMgrConfig);
|
||||
_server.setSessionIdManager(_sessionIdManager);
|
||||
((AbstractSessionIdManager) _sessionIdManager).setServer(_server);
|
||||
_scavenger = new SessionScavenger();
|
||||
_scavenger.setScavengeIntervalSec(scavengePeriod);
|
||||
_scavenger = new PeriodicSessionInspector();
|
||||
_scavenger.setIntervalSec(scavengePeriod);
|
||||
((AbstractSessionIdManager)_sessionIdManager).setSessionScavenger(_scavenger);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue