Merged branch 'jetty-9.3.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-12-21 13:02:04 +01:00
commit 3fb354f884
26 changed files with 589 additions and 132 deletions

View File

@ -23,6 +23,8 @@ import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Connection;
@ -35,11 +37,15 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class HttpConnection implements Connection
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final HttpField CHUNKED_FIELD = new HttpField(HttpHeader.TRANSFER_ENCODING, HttpHeaderValue.CHUNKED);
private final AtomicInteger idleTimeoutState = new AtomicInteger();
private final HttpDestination destination;
protected HttpConnection(HttpDestination destination)
@ -72,10 +78,12 @@ public abstract class HttpConnection implements Connection
HttpExchange exchange = new HttpExchange(getHttpDestination(), (HttpRequest)request, listeners);
send(exchange);
SendFailure result = send(exchange);
if (result != null)
request.abort(result.failure);
}
protected abstract void send(HttpExchange exchange);
protected abstract SendFailure send(HttpExchange exchange);
protected void normalizeRequest(Request request)
{
@ -167,6 +175,54 @@ public abstract class HttpConnection implements Connection
return builder;
}
protected SendFailure send(HttpChannel channel, HttpExchange exchange)
{
// Forbid idle timeouts for the time window where
// the request is associated to the channel and sent.
// Use a counter to support multiplexed requests.
boolean send = false;
while (true)
{
int current = idleTimeoutState.get();
if (current < 0)
break;
if (idleTimeoutState.compareAndSet(current, current + 1))
{
send = true;
break;
}
}
if (send)
{
HttpRequest request = exchange.getRequest();
SendFailure result;
if (channel.associate(exchange))
{
channel.send();
result = null;
}
else
{
channel.release();
result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
}
idleTimeoutState.decrementAndGet();
return result;
}
else
{
return new SendFailure(new TimeoutException(), true);
}
}
public boolean onIdleTimeout()
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout state {} - {}", idleTimeoutState, this);
return idleTimeoutState.compareAndSet(0, -1);
}
@Override
public String toString()
{

View File

@ -320,13 +320,25 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
}
else
{
send(connection, exchange);
SendFailure result = send(connection, exchange);
if (result != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
if (enqueue(getHttpExchanges(), exchange))
return true;
}
request.abort(result.failure);
}
}
return getHttpExchanges().peek() != null;
}
}
protected abstract void send(Connection connection, HttpExchange exchange);
protected abstract SendFailure send(Connection connection, HttpExchange exchange);
public void newConnection(Promise<Connection> promise)
{

View File

@ -0,0 +1,37 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
public class SendFailure
{
public final Throwable failure;
public final boolean retry;
public SendFailure(Throwable failure, boolean retry)
{
this.failure = failure;
this.retry = retry;
}
@Override
public String toString()
{
return String.format("%s[failure=%s,retry=%b]", super.toString(), failure, retry);
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -77,9 +78,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
delegate.send(request, listener);
}
protected void send(HttpExchange exchange)
protected SendFailure send(HttpExchange exchange)
{
delegate.send(exchange);
return delegate.send(exchange);
}
@Override
@ -96,11 +97,11 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
}
@Override
protected boolean onReadTimeout()
public boolean onIdleExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}", this);
close(new TimeoutException());
boolean close = delegate.onIdleTimeout();
if (close)
close(new TimeoutException("Idle timeout " + getEndPoint().getIdleTimeout() + "ms"));
return false;
}
@ -142,7 +143,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
protected void close(Throwable failure)
{
if (softClose())
if (closed.compareAndSet(false, true))
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
@ -158,11 +159,6 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
}
}
public boolean softClose()
{
return closed.compareAndSet(false, true);
}
protected boolean abort(Throwable failure)
{
HttpExchange exchange = channel.getHttpExchange();
@ -204,21 +200,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
}
@Override
protected void send(HttpExchange exchange)
protected SendFailure send(HttpExchange exchange)
{
Request request = exchange.getRequest();
normalizeRequest(request);
// Save the old idle timeout to restore it
// Save the old idle timeout to restore it.
EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.getIdleTimeout());
// One channel per connection, just delegate the send
if (channel.associate(exchange))
channel.send();
else
channel.release();
// One channel per connection, just delegate the send.
return send(channel, exchange);
}
@Override

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PoolingHttpDestination;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
public class HttpDestinationOverHTTP extends PoolingHttpDestination
@ -32,8 +33,8 @@ public class HttpDestinationOverHTTP extends PoolingHttpDestination
}
@Override
protected void send(Connection connection, HttpExchange exchange)
protected SendFailure send(Connection connection, HttpExchange exchange)
{
((HttpConnectionOverHTTP)connection).send(exchange);
return ((HttpConnectionOverHTTP)connection).send(exchange);
}
}

View File

@ -467,37 +467,24 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void test_QueuedRequest_IsSent_WhenPreviousRequestClosedConnection() throws Exception
{
start(new EmptyServerHandler());
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if (target.endsWith("/one"))
baseRequest.getHttpChannel().getEndPoint().close();
else
baseRequest.setHandled(true);
}
});
client.setMaxConnectionsPerDestination(1);
final long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
final CountDownLatch latch = new CountDownLatch(3);
final CountDownLatch latch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/one")
.listener(new Request.Listener.Adapter()
{
@Override
public void onBegin(Request request)
{
try
{
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
@Override
public void onFailure(Request request, Throwable failure)
{
latch.countDown();
}
})
.onResponseFailure((response, failure) -> latch.countDown())
.send(null);
@ -511,7 +498,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
})
.send(null);
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test

View File

@ -171,7 +171,7 @@ public class HttpReceiverOverHTTPTest
FutureResponseListener listener = (FutureResponseListener)exchange.getResponseListeners().get(0);
connection.getHttpChannel().receive();
// Simulate an idle timeout
connection.onReadTimeout();
connection.onIdleExpired();
try
{

View File

@ -79,6 +79,7 @@ public class HttpChannelOverFCGI extends HttpChannel
if (exchange != null)
{
version = exchange.getRequest().getVersion();
idle.onOpen();
sender.send(exchange);
}
}
@ -91,6 +92,7 @@ public class HttpChannelOverFCGI extends HttpChannel
protected boolean responseBegin(int code, String reason)
{
idle.notIdle();
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
@ -106,12 +108,14 @@ public class HttpChannelOverFCGI extends HttpChannel
protected boolean responseHeaders()
{
idle.notIdle();
HttpExchange exchange = getHttpExchange();
return exchange != null && receiver.responseHeaders(exchange);
}
protected boolean content(ByteBuffer buffer, Callback callback)
{
idle.notIdle();
HttpExchange exchange = getHttpExchange();
if (exchange != null)
return receiver.responseContent(exchange, buffer, callback);
@ -151,6 +155,7 @@ public class HttpChannelOverFCGI extends HttpChannel
private class FCGIIdleTimeout extends IdleTimeout
{
private final HttpConnectionOverFCGI connection;
private boolean open;
public FCGIIdleTimeout(HttpConnectionOverFCGI connection, long idleTimeout)
{
@ -159,6 +164,21 @@ public class HttpChannelOverFCGI extends HttpChannel
setIdleTimeout(idleTimeout);
}
@Override
public void onOpen()
{
open = true;
notIdle();
super.onOpen();
}
@Override
public void onClose()
{
super.onClose();
open = false;
}
@Override
protected void onIdleExpired(TimeoutException timeout)
{
@ -170,7 +190,7 @@ public class HttpChannelOverFCGI extends HttpChannel
@Override
public boolean isOpen()
{
return connection.getEndPoint().isOpen();
return open;
}
}
}

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -82,15 +83,20 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
return destination;
}
protected Flusher getFlusher()
{
return flusher;
}
@Override
public void send(Request request, Response.CompleteListener listener)
{
delegate.send(request, listener);
}
protected void send(HttpExchange exchange)
protected SendFailure send(HttpExchange exchange)
{
delegate.send(exchange);
return delegate.send(exchange);
}
@Override
@ -185,9 +191,13 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
@Override
protected boolean onReadTimeout()
public boolean onIdleExpired()
{
close(new TimeoutException());
boolean close = delegate.onIdleTimeout();
if (multiplexed)
close &= isFillInterested();
if (close)
close(new TimeoutException("Idle timeout " + getEndPoint().getIdleTimeout() + "ms"));
return false;
}
@ -197,6 +207,11 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
destination.release(this);
}
public boolean isClosed()
{
return closed.get();
}
@Override
public void close()
{
@ -212,10 +227,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
getHttpDestination().close(this);
getEndPoint().shutdownOutput();
if (LOG.isDebugEnabled())
LOG.debug("{} oshut", this);
LOG.debug("Shutdown {}", this);
getEndPoint().close();
if (LOG.isDebugEnabled())
LOG.debug("{} closed", this);
LOG.debug("Closed {}", this);
abort(failure);
}
@ -270,6 +285,11 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
}
protected HttpChannelOverFCGI newHttpChannel(int id, Request request)
{
return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout());
}
@Override
public String toString()
{
@ -288,19 +308,17 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
@Override
protected void send(HttpExchange exchange)
protected SendFailure send(HttpExchange exchange)
{
Request request = exchange.getRequest();
normalizeRequest(request);
// FCGI may be multiplexed, so create one channel for each request.
int id = acquireRequest();
HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout());
HttpChannelOverFCGI channel = newHttpChannel(id, request);
channels.put(id, channel);
if (channel.associate(exchange))
channel.send();
else
channel.release();
return send(channel, exchange);
}
@Override
@ -309,6 +327,11 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
HttpConnectionOverFCGI.this.close();
}
protected void close(Throwable failure)
{
HttpConnectionOverFCGI.this.close(failure);
}
@Override
public String toString()
{

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PoolingHttpDestination;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
public class HttpDestinationOverFCGI extends PoolingHttpDestination
@ -32,8 +33,8 @@ public class HttpDestinationOverFCGI extends PoolingHttpDestination
}
@Override
protected void send(Connection connection, HttpExchange exchange)
protected SendFailure send(Connection connection, HttpExchange exchange)
{
((HttpConnectionOverFCGI)connection).send(exchange);
return ((HttpConnectionOverFCGI)connection).send(exchange);
}
}

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
public class MultiplexHttpDestinationOverFCGI extends MultiplexHttpDestination
@ -32,8 +33,8 @@ public class MultiplexHttpDestinationOverFCGI extends MultiplexHttpDestination
}
@Override
protected void send(Connection connection, HttpExchange exchange)
protected SendFailure send(Connection connection, HttpExchange exchange)
{
((HttpConnectionOverFCGI)connection).send(exchange);
return ((HttpConnectionOverFCGI)connection).send(exchange);
}
}

View File

@ -113,11 +113,12 @@ public class HTTP2Connection extends AbstractConnection
}
@Override
protected boolean onReadTimeout()
public boolean onIdleExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
session.onIdleTimeout();
boolean close = session.onIdleTimeout();
boolean idle = isFillInterested();
if (close && idle)
session.close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
return false;
}

View File

@ -535,7 +535,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{
byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
byte[] payload = null;
if (reason != null)
{
// Trim the reason to avoid attack vectors.
reason = reason.substring(0, Math.min(reason.length(), 32));
payload = reason.getBytes(StandardCharsets.UTF_8);
}
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
control(null, callback, frame);
return true;
@ -826,30 +832,29 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
* stuck because of TCP congestion), therefore we terminate.
* See {@link #onGoAway(GoAwayFrame)}.
*
* @return true if the session should be closed, false otherwise
* @see #onGoAway(GoAwayFrame)
* @see #close(int, String, Callback)
* @see #onShutdown()
*/
@Override
public void onIdleTimeout()
public boolean onIdleTimeout()
{
switch (closed.get())
{
case NOT_CLOSED:
{
// Real idle timeout, just close.
close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
break;
return notifyIdleTimeout(this);
}
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
{
abort(new TimeoutException());
break;
return false;
}
default:
{
break;
return false;
}
}
}
@ -974,6 +979,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
protected boolean notifyIdleTimeout(Session session)
{
try
{
return listener.onIdleTimeout(session);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
return true;
}
}
protected void notifyFailure(Session session, Throwable failure)
{
try

View File

@ -118,7 +118,7 @@ public interface ISession extends Session
* @see #onShutdown()
* @see #close(int, String, Callback)
*/
public void onIdleTimeout();
public boolean onIdleTimeout();
/**
* <p>Callback method invoked during an HTTP/1.1 to HTTP/2 upgrade requests

View File

@ -200,6 +200,13 @@ public interface Session
*/
public void onClose(Session session, GoAwayFrame frame);
/**
* <p>Callback method invoked when the idle timeout expired.</p>
* @param session the session
* @return whether the session should be closed
*/
public boolean onIdleTimeout(Session session);
/**
* <p>Callback method invoked when a failure has been detected for this session.</p>
*
@ -245,6 +252,12 @@ public interface Session
{
}
@Override
public boolean onIdleTimeout(Session session)
{
return true;
}
@Override
public void onFailure(Session session, Throwable failure)
{

View File

@ -174,6 +174,12 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
connection.close();
}
@Override
public boolean onIdleTimeout(Session session)
{
return connection.onIdleTimeout();
}
@Override
public void onFailure(Session session, Throwable failure)
{

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
@ -41,17 +42,26 @@ public class HttpConnectionOverHTTP2 extends HttpConnection
this.session = session;
}
public Session getSession()
{
return session;
}
@Override
protected void send(HttpExchange exchange)
protected SendFailure send(HttpExchange exchange)
{
normalizeRequest(exchange.getRequest());
// One connection maps to N channels, so for each exchange we create a new channel.
HttpChannel channel = new HttpChannelOverHTTP2(getHttpDestination(), this, session);
HttpChannel channel = newHttpChannel();
channels.add(channel);
if (channel.associate(exchange))
channel.send();
else
channel.release();
return send(channel, exchange);
}
protected HttpChannelOverHTTP2 newHttpChannel()
{
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession());
}
protected void release(HttpChannel channel)
@ -71,7 +81,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
session.close(ErrorCode.NO_ERROR.code, null, Callback.NOOP);
session.close(ErrorCode.NO_ERROR.code, failure.getMessage(), Callback.NOOP);
abort(failure);
}

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination
@ -32,8 +33,8 @@ public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination
}
@Override
protected void send(Connection connection, HttpExchange exchange)
protected SendFailure send(Connection connection, HttpExchange exchange)
{
((HttpConnectionOverHTTP2)connection).send(exchange);
return ((HttpConnectionOverHTTP2)connection).send(exchange);
}
}

View File

@ -61,6 +61,12 @@ public abstract class AbstractConnection implements Connection
listeners.add(listener);
}
@Override
public void removeListener(Listener listener)
{
listeners.remove(listener);
}
public int getInputBufferSize()
{
return _inputBufferSize;
@ -214,6 +220,12 @@ public abstract class AbstractConnection implements Connection
getEndPoint().close();
}
@Override
public boolean onIdleExpired()
{
return true;
}
@Override
public int getMessagesIn()
{

View File

@ -370,6 +370,10 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
@Override
protected void onIdleExpired(TimeoutException timeout)
{
Connection connection = _connection;
if (connection != null && !_connection.onIdleExpired())
return;
boolean output_shutdown=isOutputShutdown();
boolean input_shutdown=isInputShutdown();
boolean fillFailed = _fillInterest.onFail(timeout);

View File

@ -33,12 +33,28 @@ import org.eclipse.jetty.util.component.Container;
*/
public interface Connection extends Closeable
{
/**
* <p>Adds a listener of connection events.</p>
*
* @param listener the listener to add
*/
public void addListener(Listener listener);
/**
* <p>Removes a listener of connection events.</p>
*
* @param listener the listener to remove
*/
public void removeListener(Listener listener);
/**
* <p>Callback method invoked when this connection is opened.</p>
* <p>Creators of the connection implementation are responsible for calling this method.</p>
*/
public void onOpen();
/**
* <p>Callback method invoked when this {@link Connection} is closed.</p>
* <p>Callback method invoked when this connection is closed.</p>
* <p>Creators of the connection implementation are responsible for calling this method.</p>
*/
public void onClose();
@ -57,6 +73,19 @@ public interface Connection extends Closeable
@Override
public void close();
/**
* <p>Callback method invoked upon an idle timeout event.</p>
* <p>Implementations of this method may return true to indicate that the idle timeout
* handling should proceed normally, typically failing the EndPoint and causing it to
* be closed.</p>
* <p>When false is returned, the handling of the idle timeout event is halted
* immediately and the EndPoint left in the state it was before the idle timeout event.</p>
*
* @return true to let the EndPoint handle the idle timeout,
* false to tell the EndPoint to halt the handling of the idle timeout.
*/
public boolean onIdleExpired();
public int getMessagesIn();
public int getMessagesOut();
public long getBytesIn();
@ -65,10 +94,11 @@ public interface Connection extends Closeable
public interface UpgradeFrom
{
/* ------------------------------------------------------------ */
/** Take the input buffer from the connection on upgrade.
/**
* <p>Takes the input buffer from the connection on upgrade.</p>
* <p>This method is used to take any unconsumed input from
* a connection during an upgrade.
* a connection during an upgrade.</p>
*
* @return A buffer of unconsumed input. The caller must return the buffer
* to the bufferpool when consumed and this connection must not.
*/
@ -78,7 +108,7 @@ public interface Connection extends Closeable
public interface UpgradeTo
{
/**
* <p>Callback method invoked when this {@link Connection} is upgraded.</p>
* <p>Callback method invoked when this connection is upgraded.</p>
* <p>This must be called before {@link #onOpen()}.</p>
* @param prefilled An optional buffer that can contain prefilled data. Typically this
* results from an upgrade of one protocol to the other where the old connection has buffered
@ -88,8 +118,6 @@ public interface Connection extends Closeable
void onUpgradeTo(ByteBuffer prefilled);
}
/* ------------------------------------------------------------ */
/**
* <p>A Listener for connection events.</p>
* <p>Listeners can be added to a {@link Connection} to get open and close events.

View File

@ -172,6 +172,12 @@ public class SslConnection extends AbstractConnection
getDecryptedEndPoint().getConnection().close();
}
@Override
public boolean onIdleExpired()
{
return getDecryptedEndPoint().getConnection().onIdleExpired();
}
@Override
public void onFillable()
{
@ -329,35 +335,41 @@ public class SslConnection extends AbstractConnection
public DecryptedEndPoint()
{
super(((AbstractEndPoint)getEndPoint()).getScheduler());
setIdleTimeout(getEndPoint().getIdleTimeout());
// Disable idle timeout checking: no scheduler and -1 timeout for this instance.
super(null);
super.setIdleTimeout(-1);
}
@Override
public long getIdleTimeout()
{
return getEndPoint().getIdleTimeout();
}
@Override
public void setIdleTimeout(long idleTimeout)
{
getEndPoint().setIdleTimeout(idleTimeout);
}
@Override
public boolean isOpen()
{
return getEndPoint().isOpen();
}
@Override
public InetSocketAddress getLocalAddress()
{
return getEndPoint().getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return getEndPoint().getRemoteAddress();
}
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
getEndPoint().setIdleTimeout(idleTimeout);
}
@Override
protected WriteFlusher getWriteFlusher()
{
@ -462,10 +474,10 @@ public class SslConnection extends AbstractConnection
}
}
}
if (fillable)
getExecutor().execute(_runFillable);
else
else
ensureFillInterested();
}
}
@ -729,13 +741,13 @@ public class SslConnection extends AbstractConnection
// will return 0 (even if some handshake bytes were flushed and filled).
// it is the applications responsibility to call flush again - either in a busy loop
// or better yet by using EndPoint#write to do the flushing.
if (DEBUG)
{
for (ByteBuffer b : appOuts)
LOG.debug("{} flush {}", SslConnection.this, BufferUtil.toHexSummary(b));
}
try
{
if (_cannotAcceptMoreAppDataToFlush)
@ -765,7 +777,7 @@ public class SslConnection extends AbstractConnection
}
if (DEBUG)
LOG.debug("{} wrap {}", SslConnection.this, wrapResult.toString().replace('\n',' '));
Status wrapResultStatus = wrapResult.getStatus();
boolean allConsumed=true;
@ -924,7 +936,7 @@ public class SslConnection extends AbstractConnection
if (!SslConnection.this.isFillInterested())
SslConnection.this.fillInterested();
}
@Override
public boolean isOutputShutdown()
{
@ -955,12 +967,6 @@ public class SslConnection extends AbstractConnection
super.doClose();
}
@Override
public boolean isOpen()
{
return getEndPoint().isOpen();
}
@Override
public Object getTransport()
{

View File

@ -181,7 +181,7 @@ public class ResourceCache implements HttpContent.Factory
* Get either a valid entry object or create a new one if possible.
*
* @param pathInContext The key into the cache
* @param maxBuffer The maximum buffer to allocated for this request. For cached content, a larger buffer may have
* @param maxBufferSize The maximum buffer to allocated for this request. For cached content, a larger buffer may have
* previously been allocated and returned by the {@link HttpContent#getDirectBuffer()} or {@link HttpContent#getIndirectBuffer()} calls.
* @return The entry matching <code>pathInContext</code>, or a new entry
* if no matching entry was found. If the content exists but is not cachable,

View File

@ -92,4 +92,3 @@ public class Loader
return loader==null ? ResourceBundle.getBundle(name, locale) : ResourceBundle.getBundle(name, locale, loader);
}
}

View File

@ -73,6 +73,12 @@ public abstract class AbstractTest
}
public void start(Handler handler) throws Exception
{
startServer(handler);
startClient();
}
protected void startServer(Handler handler) throws Exception
{
sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.jks");
@ -81,12 +87,6 @@ public abstract class AbstractTest
sslContextFactory.setTrustStorePassword("storepwd");
sslContextFactory.setUseCipherSuitesOrder(true);
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
startServer(handler);
startClient();
}
private void startServer(Handler handler) throws Exception
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);

View File

@ -0,0 +1,228 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.fcgi.client.http.HttpChannelOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpConnectionOverFCGI;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpChannelOverHTTP2;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.Assert;
import org.junit.Test;
public class HttpChannelAssociationTest extends AbstractTest
{
public HttpChannelAssociationTest(Transport transport)
{
super(transport);
}
@Test
public void testAssociationFailedAbortsRequest() throws Exception
{
startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
});
client = new HttpClient(newHttpClientTransport(transport, exchange -> false), sslContextFactory);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client.setExecutor(clientThreads);
client.start();
CountDownLatch latch = new CountDownLatch(1);
client.newRequest(newURI())
.send(result ->
{
if (result.isFailed())
latch.countDown();
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testIdleTimeoutJustBeforeAssociation() throws Exception
{
startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
});
long idleTimeout = 1000;
client = new HttpClient(newHttpClientTransport(transport, exchange ->
{
// We idle timeout just before the association,
// we must be able to send the request successfully.
sleep(2 * idleTimeout);
return true;
}), sslContextFactory);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client.setExecutor(clientThreads);
client.setIdleTimeout(idleTimeout);
client.start();
CountDownLatch latch = new CountDownLatch(1);
client.newRequest(newURI())
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
}
private HttpClientTransport newHttpClientTransport(Transport transport, Predicate<HttpExchange> code)
{
switch (transport)
{
case HTTP:
case HTTPS:
{
return new HttpClientTransportOverHTTP(1)
{
@Override
protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverHTTP(endPoint, destination, promise)
{
@Override
protected HttpChannelOverHTTP newHttpChannel()
{
return new HttpChannelOverHTTP(this)
{
@Override
public boolean associate(HttpExchange exchange)
{
return code.test(exchange) && super.associate(exchange);
}
};
}
};
}
};
}
case H2C:
case H2:
{
HTTP2Client http2Client = new HTTP2Client();
http2Client.setSelectors(1);
return new HttpClientTransportOverHTTP2(http2Client)
{
@Override
protected HttpConnectionOverHTTP2 newHttpConnection(HttpDestination destination, Session session)
{
return new HttpConnectionOverHTTP2(destination, session)
{
@Override
protected HttpChannelOverHTTP2 newHttpChannel()
{
return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession())
{
@Override
public boolean associate(HttpExchange exchange)
{
return code.test(exchange) && super.associate(exchange);
}
};
}
};
}
};
}
case FCGI:
{
return new HttpClientTransportOverFCGI(1, false, "")
{
@Override
protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
return new HttpConnectionOverFCGI(endPoint, destination, promise, isMultiplexed())
{
@Override
protected HttpChannelOverFCGI newHttpChannel(int id, org.eclipse.jetty.client.api.Request request)
{
return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout())
{
@Override
public boolean associate(HttpExchange exchange)
{
return code.test(exchange) && super.associate(exchange);
}
};
}
};
}
};
}
default:
{
throw new IllegalArgumentException();
}
}
}
private void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
}