From 822abe514e55c7795cc4721d86c4d8d059e9b218 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 24 Jul 2012 10:38:28 +0200 Subject: [PATCH] Jetty9 - Improved idle timeout handling. Idle timeouts are not enforced anymore by polling the AsyncEndPoints from the SelectorManager, but instead the AsyncEndPoints now take a SchedulerExecutorService as parameter and perform their own enforcing of the idle timeout. Also removed a few Timer usages (replaced by SchedulerExecutorService) and fixed XML files referencing old APIs. --- .../resources/binding-test-contexts-1.xml | 8 +- .../src/test/resources/jetty-deploy-wars.xml | 4 +- .../eclipse/jetty/io/AbstractEndPoint.java | 14 +- .../jetty/io/AsyncByteArrayEndPoint.java | 130 ++++----- .../org/eclipse/jetty/io/AsyncEndPoint.java | 97 ++++--- .../java/org/eclipse/jetty/io/EndPoint.java | 6 +- .../NetworkTrafficSelectChannelEndPoint.java | 5 +- .../jetty/io/SelectChannelEndPoint.java | 71 +++-- .../org/eclipse/jetty/io/SelectorManager.java | 68 +---- .../eclipse/jetty/io/ssl/SslConnection.java | 5 - .../jetty/io/AsyncByteArrayEndPointTest.java | 121 ++++---- .../jetty/io/SelectChannelEndPointTest.java | 268 +++++++++--------- .../eclipse/jetty/io/SslConnectionTest.java | 5 +- .../jetty-rewrite.xml | 4 +- .../jetty/server/AbstractConnector.java | 189 +++--------- .../jetty/server/AbstractNetConnector.java | 100 +++++++ .../org/eclipse/jetty/server/Connector.java | 4 +- .../org/eclipse/jetty/server/HttpChannel.java | 103 +++---- .../jetty/server/HttpChannelState.java | 177 ++++++------ .../eclipse/jetty/server/HttpConnection.java | 15 +- .../eclipse/jetty/server/HttpConnector.java | 66 +++-- .../jetty/server/LocalHttpConnector.java | 29 +- .../jetty/server/SelectChannelConnector.java | 17 +- .../NetworkTrafficSelectChannelConnector.java | 2 +- .../eclipse/jetty/server/HttpWriterTest.java | 26 +- .../eclipse/jetty/server/ResponseTest.java | 18 +- .../server/SelectChannelStatisticsTest.java | 11 +- .../client/WebSocketClientFactory.java | 64 +++-- .../io/WebSocketClientSelectorManager.java | 8 +- .../src/test/resources/RFC2616Base.xml | 60 ++-- 30 files changed, 812 insertions(+), 883 deletions(-) create mode 100644 jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetConnector.java diff --git a/jetty-deploy/src/test/resources/binding-test-contexts-1.xml b/jetty-deploy/src/test/resources/binding-test-contexts-1.xml index 65e13ab5cf0..38979a28ab5 100644 --- a/jetty-deploy/src/test/resources/binding-test-contexts-1.xml +++ b/jetty-deploy/src/test/resources/binding-test-contexts-1.xml @@ -25,13 +25,13 @@ org.eclipse.jetty.servlet.DefaultServlet - + - - + + @@ -41,7 +41,7 @@ - + diff --git a/jetty-deploy/src/test/resources/jetty-deploy-wars.xml b/jetty-deploy/src/test/resources/jetty-deploy-wars.xml index 6ab95b65d58..859887b555a 100644 --- a/jetty-deploy/src/test/resources/jetty-deploy-wars.xml +++ b/jetty-deploy/src/test/resources/jetty-deploy-wars.xml @@ -3,13 +3,13 @@ - + - + diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index e5f080ec794..e1918dd1b43 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -7,7 +7,7 @@ public abstract class AbstractEndPoint implements EndPoint private final long _created=System.currentTimeMillis(); private final InetSocketAddress _local; private final InetSocketAddress _remote; - private volatile long _maxIdleTime; + private volatile long _idleTimeout; private volatile long _idleTimestamp=System.currentTimeMillis(); @@ -27,42 +27,37 @@ public abstract class AbstractEndPoint implements EndPoint @Override public long getIdleTimeout() { - return _maxIdleTime; + return _idleTimeout; } @Override - public void setIdleTimeout(long timeMs) + public void setIdleTimeout(long idleTimeout) { - _maxIdleTime=timeMs; + _idleTimeout = idleTimeout; } - /* ------------------------------------------------------------ */ @Override public InetSocketAddress getLocalAddress() { return _local; } - /* ------------------------------------------------------------ */ @Override public InetSocketAddress getRemoteAddress() { return _remote; } - /* ------------------------------------------------------------ */ public long getIdleTimestamp() { return _idleTimestamp; } - /* ------------------------------------------------------------ */ protected void notIdle() { _idleTimestamp=System.currentTimeMillis(); } - /* ------------------------------------------------------------ */ @Override public String toString() { @@ -74,5 +69,4 @@ public abstract class AbstractEndPoint implements EndPoint isOpen(), isOutputShutdown()); } - } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java index 27d11d5558e..52d4b924263 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java @@ -3,35 +3,20 @@ package org.eclipse.jetty.io; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint +public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint, Runnable { - private static final int TICK=Integer.getInteger("org.eclipse.jetty.io.AsyncByteArrayEndPoint.TICK",100); - public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class); - private final ScheduledExecutorService _timer; - private AsyncConnection _connection; - - private final Runnable _checkTimeout=new Runnable() - { - @Override - public void run() - { - if (isOpen()) - { - checkTimeout(System.currentTimeMillis()); - if (isOpen()) - _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); - } - } - }; + private static final Logger LOG = Log.getLogger(AsyncByteArrayEndPoint.class); private final ReadInterest _readInterest = new ReadInterest() { @@ -40,10 +25,9 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn { if (_closed) throw new ClosedChannelException(); - return _in==null || BufferUtil.hasContent(_in); + return _in == null || BufferUtil.hasContent(_in); } }; - private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override @@ -52,33 +36,80 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn // Don't need to do anything here as takeOutput does the signalling. } }; + private final AtomicReference> _timeout = new AtomicReference<>(); + private final ScheduledExecutorService _scheduler; + private volatile AsyncConnection _connection; - public AsyncByteArrayEndPoint(ScheduledExecutorService timer) + public AsyncByteArrayEndPoint(ScheduledExecutorService scheduler, long idleTimeout) { - super(); - _timer=timer; - _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); + _scheduler = scheduler; + setIdleTimeout(idleTimeout); } - public AsyncByteArrayEndPoint(ScheduledExecutorService timer, byte[] input, int outputSize) + public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, byte[] input, int outputSize) { - super(input,outputSize); - _timer=timer; - _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); + super(input, outputSize); + _scheduler = timer; + setIdleTimeout(idleTimeout); } - public AsyncByteArrayEndPoint(ScheduledExecutorService timer, String input, int outputSize) + public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, String input, int outputSize) { - super(input,outputSize); - _timer=timer; - _timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS); + super(input, outputSize); + _scheduler = timer; + setIdleTimeout(idleTimeout); + } + + @Override + public void setIdleTimeout(long idleTimeout) + { + super.setIdleTimeout(idleTimeout); + scheduleIdleTimeout(idleTimeout); + } + + private void scheduleIdleTimeout(long delay) + { + Future newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(this, delay, TimeUnit.MILLISECONDS) : null; + Future oldTimeout = _timeout.getAndSet(newTimeout); + if (oldTimeout != null) + oldTimeout.cancel(false); + } + + @Override + public void run() + { + if (isOpen()) + { + long idleTimestamp = getIdleTimestamp(); + long idleTimeout = getIdleTimeout(); + long idleElapsed = System.currentTimeMillis() - idleTimestamp; + long idleLeft = idleTimeout - idleElapsed; + + if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) + { + if (idleTimestamp != 0 && idleTimeout > 0) + { + if (idleLeft < 0) + { + if (isOutputShutdown()) + close(); + notIdle(); + + TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms"); + _readInterest.failed(timeout); + _writeFlusher.failed(timeout); + } + } + } + scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout); + } } @Override public void setInput(ByteBuffer in) { super.setInput(in); - if (in==null || BufferUtil.hasContent(in)) + if (in == null || BufferUtil.hasContent(in)) _readInterest.readable(); } @@ -126,36 +157,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn @Override public void setAsyncConnection(AsyncConnection connection) { - _connection=connection; - } - - @Override - public void checkTimeout(long now) - { - synchronized (this) - { - if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) - { - long idleTimestamp = getIdleTimestamp(); - long idleTimeout = getIdleTimeout(); - - if (idleTimestamp != 0 && idleTimeout > 0) - { - long idleForMs = now - idleTimestamp; - - if (idleForMs > idleTimeout) - { - if (isOutputShutdown()) - close(); - notIdle(); - - TimeoutException timeout = new TimeoutException("idle "+idleForMs+"ms"); - _readInterest.failed(timeout); - _writeFlusher.failed(timeout); - } - } - } - } + _connection = connection; } @Override diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java index 9e25df3638f..1d770dc0416 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -21,30 +21,30 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ExecutorCallback; import org.eclipse.jetty.util.FutureCallback; -/* ------------------------------------------------------------ */ -/**Asynchronous End Point - *

- * This extension of EndPoint provides asynchronous scheduling methods. - * The design of these has been influenced by NIO.2 Futures and Completion - * handlers, but does not use those actual interfaces because: they have - * some inefficiencies. - *

- * This class will frequently be used in conjunction with some of the utility +/** + *

{@link AsyncEndPoint} add asynchronous scheduling methods to {@link EndPoint}.

+ *

The design of these has been influenced by NIO.2 Futures and Completion + * handlers, but does not use those actual interfaces because they have + * some inefficiencies.

+ *

This class will frequently be used in conjunction with some of the utility * implementations of {@link Callback}, such as {@link FutureCallback} and - * {@link ExecutorCallback}. Examples are: + * {@link ExecutorCallback}. Examples are:

+ * *

Blocking Read

- * A FutureCallback can be used to block until an endpoint is ready to be filled + *

A FutureCallback can be used to block until an endpoint is ready to be filled * from: *

- * FutureCallback future = new FutureCallback<>();
+ * FutureCallback<String> future = new FutureCallback<>();
  * endpoint.fillInterested("ContextObj",future);
  * ...
  * String context = future.get(); // This blocks
- * int filled=endpoint.fill(mybuffer);
+ * int filled=endpoint.fill(mybuffer); + *

+ * *

Dispatched Read

- * By using a different callback, the read can be done asynchronously in its own dispatched thread: + *

By using a different callback, the read can be done asynchronously in its own dispatched thread: *

- * endpoint.fillInterested("ContextObj",new ExecutorCallback(executor)
+ * endpoint.fillInterested("ContextObj",new ExecutorCallback<String>(executor)
  * {
  *   public void onCompleted(String context)
  *   {
@@ -52,25 +52,25 @@ import org.eclipse.jetty.util.FutureCallback;
  *     ...
  *   }
  *   public void onFailed(String context,Throwable cause) {...}
- * });
- * The executor callback can also be customized to not dispatch in some circumstances when - * it knows it can use the callback thread and does not need to dispatch. + * }); + *

+ *

The executor callback can also be customized to not dispatch in some circumstances when + * it knows it can use the callback thread and does not need to dispatch.

* *

Blocking Write

- * The write contract is that the callback complete is not called until all data has been + *

The write contract is that the callback complete is not called until all data has been * written or there is a failure. For blocking this looks like: - * *

- * FutureCallback future = new FutureCallback<>();
+ * FutureCallback<String> future = new FutureCallback<>();
  * endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
  * String context = future.get(); // This blocks
- * 
+ *

* *

Dispatched Write

- * Note also that multiple buffers may be passed in write so that gather writes + *

Note also that multiple buffers may be passed in write so that gather writes * can be done: *

- * endpoint.write("ContextObj",new ExecutorCallback(executor)
+ * endpoint.write("ContextObj",new ExecutorCallback<String>(executor)
  * {
  *   public void onCompleted(String context)
  *   {
@@ -78,45 +78,52 @@ import org.eclipse.jetty.util.FutureCallback;
  *     ...
  *   }
  *   public void onFailed(String context,Throwable cause) {...}
- * },headerBuffer,contentBuffer);
- * + * },headerBuffer,contentBuffer); + *

*/ public interface AsyncEndPoint extends EndPoint { - /** Asynchronous a fillable notification. - *

- * This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF. - * @param context Context to return via the callback - * @param callback The callback to call when an error occurs or we are readable. + /** + *

Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.

+ * + * @param context the context to return via the callback + * @param callback the callback to call when an error occurs or we are readable. * @throws ReadPendingException if another read operation is concurrent. */ void fillInterested(C context, Callback callback) throws ReadPendingException; - /** Asynchronous write operation. - *

- * This method performs {@link #flush(ByteBuffer...)} operation(s) and do a callback when all the data - * has been flushed or an error occurs. - * @param context Context to return via the callback - * @param callback The callback to call when an error occurs or we are readable. - * @param buffers One or more {@link ByteBuffer}s that will be flushed. + /** + *

Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either + * all the data has been flushed or an error occurs.

+ * + * @param context the context to return via the callback + * @param callback the callback to call when an error occurs or the write completed. + * @param buffers one or more {@link ByteBuffer}s that will be flushed. * @throws WritePendingException if another write operation is concurrent. */ void write(C context, Callback callback, ByteBuffer... buffers) throws WritePendingException; /** - * @return Timestamp in ms since epoch of when the last data was - * filled or flushed from this endpoint. + * @return the {@link AsyncConnection} associated with this {@link AsyncEndPoint} + * @see #setAsyncConnection(AsyncConnection) */ - long getIdleTimestamp(); - AsyncConnection getAsyncConnection(); + /** + * @param connection the {@link AsyncConnection} associated with this {@link AsyncEndPoint} + * @see #getAsyncConnection() + */ void setAsyncConnection(AsyncConnection connection); + /** + *

Callback method invoked when this {@link AsyncEndPoint} is opened.

+ * @see #onClose() + */ void onOpen(); + /** + *

Callback method invoked when this {@link AsyncEndPoint} is close.

+ * @see #onOpen() + */ void onClose(); - - void checkTimeout(long now); - } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index baefc93d312..692890fe39c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -119,10 +119,10 @@ public interface EndPoint long getIdleTimeout(); /* ------------------------------------------------------------ */ - /** Set the max idle time. - * @param timeMs the max idle time in MS. Timeout <= 0 implies an infinite timeout + /** Set the idle timeout. + * @param idleTimeout the idle timeout in MS. Timeout <= 0 implies an infinite timeout */ - void setIdleTimeout(long timeMs); + void setIdleTimeout(long idleTimeout); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java index 827ba0e272f..304b79b381f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/NetworkTrafficSelectChannelEndPoint.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -28,9 +29,9 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint private final List listeners; - public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key, long idleTimeout, List listeners) throws IOException + public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout, List listeners) throws IOException { - super(channel, selectSet, key, idleTimeout); + super(channel, selectSet, key, scheduler, idleTimeout); this.listeners = listeners; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index ba889506fa2..7defd64defe 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -17,8 +17,12 @@ import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.SelectorManager.ManagedSelector; import org.eclipse.jetty.util.Callback; @@ -32,21 +36,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); - private final SelectorManager.ManagedSelector _selector; - private final SelectionKey _key; - - /** - * The desired value for {@link SelectionKey#interestOps()} - */ - private volatile int _interestOps; - + private final AtomicReference> _timeout = new AtomicReference<>(); + private final Runnable _idleTask = new Runnable() + { + @Override + public void run() + { + checkIdleTimeout(); + } + }; /** * true if {@link ManagedSelector#destroyEndPoint(AsyncEndPoint)} has not been called */ private final AtomicBoolean _open = new AtomicBoolean(); - - private volatile AsyncConnection _connection; - private final ReadInterest _readInterest = new ReadInterest() { @Override @@ -56,7 +58,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, return false; } }; - private final WriteFlusher _writeFlusher = new WriteFlusher(this) { @Override @@ -65,15 +66,39 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, updateKey(SelectionKey.OP_WRITE, true); } }; + private final SelectorManager.ManagedSelector _selector; + private final SelectionKey _key; + private final ScheduledExecutorService _scheduler; + private volatile AsyncConnection _connection; + /** + * The desired value for {@link SelectionKey#interestOps()} + */ + private volatile int _interestOps; - public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, long idleTimeout) throws IOException + public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout) throws IOException { super(channel); _selector = selector; _key = key; + _scheduler = scheduler; setIdleTimeout(idleTimeout); } + @Override + public void setIdleTimeout(long idleTimeout) + { + super.setIdleTimeout(idleTimeout); + scheduleIdleTimeout(idleTimeout); + } + + private void scheduleIdleTimeout(long delay) + { + Future newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(_idleTask, delay, TimeUnit.MILLISECONDS) : null; + Future oldTimeout = _timeout.getAndSet(newTimeout); + if (oldTimeout != null) + oldTimeout.cancel(false); + } + @Override public void fillInterested(C context, Callback callback) throws IllegalStateException { @@ -111,32 +136,32 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, _writeFlusher.completeWrite(); } - @Override - public void checkTimeout(long now) + private void checkIdleTimeout() { - synchronized (this) + if (isOpen()) { + long idleTimestamp = getIdleTimestamp(); + long idleTimeout = getIdleTimeout(); + long idleElapsed = System.currentTimeMillis() - idleTimestamp; + long idleLeft = idleTimeout - idleElapsed; + if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting()) { - long idleTimestamp = getIdleTimestamp(); - long idleTimeout = getIdleTimeout(); - if (idleTimestamp != 0 && idleTimeout > 0) { - long idleForMs = now - idleTimestamp; - - if (idleForMs > idleTimeout) + if (idleLeft < 0) { if (isOutputShutdown()) close(); notIdle(); - TimeoutException timeout = new TimeoutException("idle " + idleForMs + "ms"); + TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms"); _readInterest.failed(timeout); _writeFlusher.failed(timeout); } } } + scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index beab3976376..e0e3ef76641 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -25,17 +25,15 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.Dumpable; @@ -54,7 +52,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private final ManagedSelector[] _selectors; private volatile long _selectorIndex; - private volatile long _idleCheckPeriod; protected SelectorManager() { @@ -64,23 +61,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa protected SelectorManager(@Name(value="selectors") int selectors) { _selectors = new ManagedSelector[selectors]; - setIdleCheckPeriod(1000); - } - - /** - * @return the period, in milliseconds, a background thread checks for idle expiration - */ - public long getIdleCheckPeriod() - { - return _idleCheckPeriod; - } - - /** - * @param idleCheckPeriod the period, in milliseconds, a background thread checks for idle expiration - */ - public void setIdleCheckPeriod(long idleCheckPeriod) - { - _idleCheckPeriod = idleCheckPeriod; } /** @@ -145,7 +125,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa _selectors[i] = selectSet; selectSet.start(); execute(selectSet); - execute(new Expirer()); } } @@ -251,37 +230,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa AggregateLifeCycle.dump(out, indent, TypeUtil.asList(_selectors)); } - /** - *

The task that performs a periodic idle check of the connections managed by the selectors.

- * @see #getIdleCheckPeriod() - */ - private class Expirer implements Runnable - { - @Override - public void run() - { - while (isRunning()) - { - for (ManagedSelector selector : _selectors) - if (selector != null) - selector.timeoutCheck(); - sleep(getIdleCheckPeriod()); - } - } - - private void sleep(long delay) - { - try - { - Thread.sleep(delay); - } - catch (InterruptedException x) - { - LOG.ignore(x); - } - } - } - /** *

{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.

*

{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events @@ -291,7 +239,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable { private final Queue _changes = new ConcurrentLinkedQueue<>(); - private final Set _endPoints = Collections.newSetFromMap(new ConcurrentHashMap()); private final int _id; private Selector _selector; private Thread _thread; @@ -517,7 +464,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException { AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey); - _endPoints.add(endPoint); endPointOpened(endPoint); AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment()); endPoint.setAsyncConnection(asyncConnection); @@ -529,11 +475,10 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa public void destroyEndPoint(AsyncEndPoint endPoint) { LOG.debug("Destroyed {}", endPoint); - _endPoints.remove(endPoint); endPoint.getAsyncConnection().onClose(); endPointClosed(endPoint); } - + @Override public String dump() { @@ -597,15 +542,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); } - private void timeoutCheck() - { - // We cannot use the _selector.keys() because the returned Set is not thread - // safe so it may be modified by the selector thread while we iterate here. - long now = System.currentTimeMillis(); - for (AsyncEndPoint endPoint : _endPoints) - endPoint.checkTimeout(now); - } - private class DumpKeys implements Runnable { private final CountDownLatch latch = new CountDownLatch(1); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index 11a2fd2e5a5..a9a1b90b5de 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -179,11 +179,6 @@ public class SslConnection extends AbstractAsyncConnection { } - @Override - public void checkTimeout(long now) - { - } - private final Callback _writeCallback = new Callback() { diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java index 5b4b539426c..7a99925c715 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/AsyncByteArrayEndPointTest.java @@ -1,16 +1,9 @@ package org.eclipse.jetty.io; -import static junit.framework.Assert.assertEquals; -import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.util.BufferUtil; @@ -20,85 +13,91 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import static junit.framework.Assert.assertEquals; +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class AsyncByteArrayEndPointTest { - ScheduledExecutorService _timer; - + private ScheduledExecutorService _scheduler; + @Before public void before() { - _timer = new ScheduledThreadPoolExecutor(1); + _scheduler = Executors.newSingleThreadScheduledExecutor(); } - + @After public void after() { - _timer.shutdownNow(); + _scheduler.shutdownNow(); } - + @Test public void testReadable() throws Exception { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000); endp.setInput("test input"); ByteBuffer buffer = BufferUtil.allocate(1024); FutureCallback fcb = new FutureCallback<>(); - endp.fillInterested("CTX",fcb); + endp.fillInterested("CTX", fcb); assertTrue(fcb.isDone()); - assertEquals("CTX",fcb.get()); - assertEquals(10,endp.fill(buffer)); - assertEquals("test input",BufferUtil.toString(buffer)); + assertEquals("CTX", fcb.get()); + assertEquals(10, endp.fill(buffer)); + assertEquals("test input", BufferUtil.toString(buffer)); fcb = new FutureCallback<>(); - endp.fillInterested("CTX",fcb); + endp.fillInterested("CTX", fcb); assertFalse(fcb.isDone()); - assertEquals(0,endp.fill(buffer)); + assertEquals(0, endp.fill(buffer)); endp.setInput(" more"); assertTrue(fcb.isDone()); - assertEquals("CTX",fcb.get()); - assertEquals(5,endp.fill(buffer)); - assertEquals("test input more",BufferUtil.toString(buffer)); + assertEquals("CTX", fcb.get()); + assertEquals(5, endp.fill(buffer)); + assertEquals("test input more", BufferUtil.toString(buffer)); fcb = new FutureCallback<>(); - endp.fillInterested("CTX",fcb); + endp.fillInterested("CTX", fcb); assertFalse(fcb.isDone()); - assertEquals(0,endp.fill(buffer)); + assertEquals(0, endp.fill(buffer)); endp.setInput((ByteBuffer)null); assertTrue(fcb.isDone()); - assertEquals("CTX",fcb.get()); - assertEquals(-1,endp.fill(buffer)); + assertEquals("CTX", fcb.get()); + assertEquals(-1, endp.fill(buffer)); fcb = new FutureCallback<>(); - endp.fillInterested("CTX",fcb); + endp.fillInterested("CTX", fcb); assertTrue(fcb.isDone()); - assertEquals("CTX",fcb.get()); - assertEquals(-1,endp.fill(buffer)); + assertEquals("CTX", fcb.get()); + assertEquals(-1, endp.fill(buffer)); endp.close(); fcb = new FutureCallback<>(); - endp.fillInterested("CTX",fcb); + endp.fillInterested("CTX", fcb); assertTrue(fcb.isDone()); try { fcb.get(); fail(); } - catch(ExecutionException e) + catch (ExecutionException e) { - assertThat(e.toString(),containsString("Closed")); + assertThat(e.toString(), containsString("Closed")); } - } @Test public void testWrite() throws Exception { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,(byte[])null,15); + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000, (byte[])null, 15); endp.setGrowOutput(false); endp.setOutput(BufferUtil.allocate(10)); @@ -106,28 +105,27 @@ public class AsyncByteArrayEndPointTest ByteBuffer more = BufferUtil.toBuffer(" Some more."); FutureCallback fcb = new FutureCallback<>(); - endp.write("CTX",fcb,data); + endp.write("CTX", fcb, data); assertTrue(fcb.isDone()); - assertEquals("CTX",fcb.get()); - assertEquals("Data.",endp.getOutputString()); + assertEquals("CTX", fcb.get()); + assertEquals("Data.", endp.getOutputString()); fcb = new FutureCallback<>(); - endp.write("CTX",fcb,more); + endp.write("CTX", fcb, more); assertFalse(fcb.isDone()); - assertEquals("Data. Some",endp.getOutputString()); - assertEquals("Data. Some",endp.takeOutputString()); + assertEquals("Data. Some", endp.getOutputString()); + assertEquals("Data. Some", endp.takeOutputString()); assertTrue(fcb.isDone()); - assertEquals("CTX",fcb.get()); - assertEquals(" more.",endp.getOutputString()); + assertEquals("CTX", fcb.get()); + assertEquals(" more.", endp.getOutputString()); } @Test public void testIdle() throws Exception { - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); - endp.setIdleTimeout(500); + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 500); endp.setInput("test"); endp.setGrowOutput(false); endp.setOutput(BufferUtil.allocate(5)); @@ -141,43 +139,43 @@ public class AsyncByteArrayEndPointTest ByteBuffer buffer = BufferUtil.allocate(1024); FutureCallback fcb = new FutureCallback<>(); - endp.fillInterested(null,fcb); + endp.fillInterested(null, fcb); assertTrue(fcb.isDone()); - assertEquals(null,fcb.get()); - assertEquals(4,endp.fill(buffer)); - assertEquals("test",BufferUtil.toString(buffer)); + assertEquals(null, fcb.get()); + assertEquals(4, endp.fill(buffer)); + assertEquals("test", BufferUtil.toString(buffer)); // read timeout fcb = new FutureCallback<>(); - endp.fillInterested(null,fcb); - long start=System.currentTimeMillis(); + endp.fillInterested(null, fcb); + long start = System.currentTimeMillis(); try { fcb.get(); fail(); } - catch(ExecutionException t) + catch (ExecutionException t) { - assertThat(t.getCause(),Matchers.instanceOf(TimeoutException.class)); + assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class)); } - assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(100L)); + assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L)); assertTrue(endp.isOpen()); // write timeout fcb = new FutureCallback<>(); - start=System.currentTimeMillis(); + start = System.currentTimeMillis(); - endp.write(null,fcb,BufferUtil.toBuffer("This is too long")); + endp.write(null, fcb, BufferUtil.toBuffer("This is too long")); try { fcb.get(); fail(); } - catch(ExecutionException t) + catch (ExecutionException t) { - assertThat(t.getCause(),Matchers.instanceOf(TimeoutException.class)); + assertThat(t.getCause(), Matchers.instanceOf(TimeoutException.class)); } - assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(100L)); + assertThat(System.currentTimeMillis() - start, Matchers.greaterThan(100L)); assertTrue(endp.isOpen()); // Still no idle close @@ -190,8 +188,5 @@ public class AsyncByteArrayEndPointTest // idle close Thread.sleep(1000); assertFalse(endp.isOpen()); - } - - } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index ab656e88dad..da9f3e51354 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -1,3 +1,19 @@ +// ======================================================================== +// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +//======================================================================== + package org.eclipse.jetty.io; import java.io.BufferedInputStream; @@ -13,6 +29,8 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.util.BufferUtil; @@ -35,6 +53,7 @@ public class SelectChannelEndPointTest protected volatile AsyncEndPoint _lastEndp; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); + protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor(); protected SelectorManager _manager = new SelectorManager() { @Override @@ -46,27 +65,27 @@ public class SelectChannelEndPointTest @Override public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { - return SelectChannelEndPointTest.this.newConnection(channel,endpoint); + return SelectChannelEndPointTest.this.newConnection(channel, endpoint); } @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { - SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, 60000); - _lastEndp=endp; + SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, _scheduler, 60000); + _lastEndp = endp; return endp; } }; // Must be volatile or the test may fail spuriously - protected volatile int _blockAt=0; - private volatile int _writeCount=1; + protected volatile int _blockAt = 0; + private volatile int _writeCount = 1; @Before public void startManager() throws Exception { - _writeCount=1; - _lastEndp=null; + _writeCount = 1; + _lastEndp = null; _connector = ServerSocketChannel.open(); _connector.socket().bind(null); _threadPool.start(); @@ -83,7 +102,7 @@ public class SelectChannelEndPointTest protected Socket newClient() throws IOException { - return new Socket(_connector.socket().getInetAddress(),_connector.socket().getLocalPort()); + return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort()); } protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) @@ -93,13 +112,13 @@ public class SelectChannelEndPointTest public class TestConnection extends AbstractAsyncConnection { - ByteBuffer _in = BufferUtil.allocate(32*1024); - ByteBuffer _out = BufferUtil.allocate(32*1024); - long _last=-1; + ByteBuffer _in = BufferUtil.allocate(32 * 1024); + ByteBuffer _out = BufferUtil.allocate(32 * 1024); + long _last = -1; public TestConnection(AsyncEndPoint endp) { - super(endp,_threadPool); + super(endp, _threadPool); } @Override @@ -108,45 +127,45 @@ public class SelectChannelEndPointTest AsyncEndPoint _endp = getEndPoint(); try { - _last=System.currentTimeMillis(); - boolean progress=true; - while(progress) + _last = System.currentTimeMillis(); + boolean progress = true; + while (progress) { - progress=false; + progress = false; // Fill the input buffer with everything available if (BufferUtil.isFull(_in)) - throw new IllegalStateException("FULL "+BufferUtil.toDetailString(_in)); - int filled=_endp.fill(_in); - if (filled>0) - progress=true; + throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in)); + int filled = _endp.fill(_in); + if (filled > 0) + progress = true; // If the tests wants to block, then block - while (_blockAt>0 && _endp.isOpen() && _in.remaining()<_blockAt) + while (_blockAt > 0 && _endp.isOpen() && _in.remaining() < _blockAt) { - FutureCallback blockingRead= new FutureCallback<>(); - _endp.fillInterested(null,blockingRead); + FutureCallback blockingRead = new FutureCallback<>(); + _endp.fillInterested(null, blockingRead); blockingRead.get(); - filled=_endp.fill(_in); - progress|=filled>0; + filled = _endp.fill(_in); + progress |= filled > 0; } // Copy to the out buffer - if (BufferUtil.hasContent(_in) && BufferUtil.append(_in,_out)>0) - progress=true; + if (BufferUtil.hasContent(_in) && BufferUtil.append(_in, _out) > 0) + progress = true; // Blocking writes if (BufferUtil.hasContent(_out)) { - ByteBuffer out=_out.duplicate(); + ByteBuffer out = _out.duplicate(); BufferUtil.clear(_out); - for (int i=0;i<_writeCount;i++) + for (int i = 0; i < _writeCount; i++) { - FutureCallback blockingWrite= new FutureCallback<>(); - _endp.write(null,blockingWrite,out.asReadOnlyBuffer()); + FutureCallback blockingWrite = new FutureCallback<>(); + _endp.write(null, blockingWrite, out.asReadOnlyBuffer()); blockingWrite.get(); } - progress=true; + progress = true; } // are we done? @@ -154,26 +173,26 @@ public class SelectChannelEndPointTest _endp.shutdownOutput(); } } - catch(ExecutionException e) + catch (ExecutionException e) { // Timeout does not close, so echo exception then shutdown try { - FutureCallback blockingWrite= new FutureCallback<>(); - _endp.write(null,blockingWrite,BufferUtil.toBuffer("EE: "+BufferUtil.toString(_in))); + FutureCallback blockingWrite = new FutureCallback<>(); + _endp.write(null, blockingWrite, BufferUtil.toBuffer("EE: " + BufferUtil.toString(_in))); blockingWrite.get(); _endp.shutdownOutput(); } - catch(Exception e2) + catch (Exception e2) { // e2.printStackTrace(); } } - catch(InterruptedException|EofException e) + catch (InterruptedException | EofException e) { SelectChannelEndPoint.LOG.ignore(e); } - catch(Exception e) + catch (Exception e) { SelectChannelEndPoint.LOG.warn(e); } @@ -204,21 +223,21 @@ public class SelectChannelEndPointTest for (char c : "HelloWorld".toCharArray()) { int b = client.getInputStream().read(); - assertTrue(b>0); - assertEquals(c,(char)b); + assertTrue(b > 0); + assertEquals(c, (char)b); } // wait for read timeout client.setSoTimeout(500); - long start=System.currentTimeMillis(); + long start = System.currentTimeMillis(); try { client.getInputStream().read(); Assert.fail(); } - catch(SocketTimeoutException e) + catch (SocketTimeoutException e) { - long duration = System.currentTimeMillis()-start; + long duration = System.currentTimeMillis() - start; Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L)); } @@ -230,11 +249,11 @@ public class SelectChannelEndPointTest { int b = client.getInputStream().read(); Assert.assertThat("expect valid char integer", b, greaterThan(0)); - assertEquals("expect characters to be same", c,(char)b); + assertEquals("expect characters to be same", c, (char)b); } client.close(); - int i=0; + int i = 0; while (server.isOpen()) { Thread.sleep(10); @@ -262,20 +281,20 @@ public class SelectChannelEndPointTest for (char c : "HelloWorld".toCharArray()) { int b = client.getInputStream().read(); - assertTrue(b>0); - assertEquals(c,(char)b); + assertTrue(b > 0); + assertEquals(c, (char)b); } // wait for read timeout - long start=System.currentTimeMillis(); + long start = System.currentTimeMillis(); try { client.getInputStream().read(); Assert.fail(); } - catch(SocketTimeoutException e) + catch (SocketTimeoutException e) { - assertTrue(System.currentTimeMillis()-start>=400); + assertTrue(System.currentTimeMillis() - start >= 400); } // write then shutdown @@ -287,17 +306,14 @@ public class SelectChannelEndPointTest for (char c : "Goodbye Cruel TLS".toCharArray()) { int b = client.getInputStream().read(); - assertTrue(b>0); - assertEquals(c,(char)b); + assertTrue(b > 0); + assertEquals(c, (char)b); } // Read close - assertEquals(-1,client.getInputStream().read()); - + assertEquals(-1, client.getInputStream().read()); } - - @Test public void testBlockRead() throws Exception { @@ -315,27 +331,27 @@ public class SelectChannelEndPointTest client.setSoTimeout(specifiedTimeout); // Write 8 and cause block waiting for 10 - _blockAt=10; + _blockAt = 10; clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.flush(); - long wait=System.currentTimeMillis()+1000; - while(_lastEndp==null && System.currentTimeMillis()0); - assertEquals(c,(char)b); + assertTrue(b > 0); + assertEquals(c, (char)b); } } @@ -370,20 +386,20 @@ public class SelectChannelEndPointTest for (char c : "HelloWorld".toCharArray()) { int b = client.getInputStream().read(); - assertTrue(b>0); - assertEquals(c,(char)b); + assertTrue(b > 0); + assertEquals(c, (char)b); } // Set Max idle _lastEndp.setIdleTimeout(500); // read until idle shutdown received - long start=System.currentTimeMillis(); - int b=client.getInputStream().read(); - assertEquals(-1,b); - long idle=System.currentTimeMillis()-start; - assertTrue(idle>400); - assertTrue(idle<2000); + long start = System.currentTimeMillis(); + int b = client.getInputStream().read(); + assertEquals(-1, b); + long idle = System.currentTimeMillis() - start; + assertTrue(idle > 400); + assertTrue(idle < 2000); // But endpoint may still be open for a little bit. if (_lastEndp.isOpen()) @@ -414,31 +430,31 @@ public class SelectChannelEndPointTest for (char c : "HelloWorld".toCharArray()) { int b = client.getInputStream().read(); - assertTrue(b>0); - assertEquals(c,(char)b); + assertTrue(b > 0); + assertEquals(c, (char)b); } // Set Max idle _lastEndp.setIdleTimeout(500); // Write 8 and cause block waiting for 10 - _blockAt=10; + _blockAt = 10; clientOutputStream.write("12345678".getBytes("UTF-8")); clientOutputStream.flush(); // read until idle shutdown received - long start=System.currentTimeMillis(); - int b=client.getInputStream().read(); - assertEquals('E',b); - long idle=System.currentTimeMillis()-start; - assertTrue(idle>400); - assertTrue(idle<2000); + long start = System.currentTimeMillis(); + int b = client.getInputStream().read(); + assertEquals('E', b); + long idle = System.currentTimeMillis() - start; + assertTrue(idle > 400); + assertTrue(idle < 2000); for (char c : "E: 12345678".toCharArray()) { b = client.getInputStream().read(); - assertTrue(b>0); - assertEquals(c,(char)b); + assertTrue(b > 0); + assertEquals(c, (char)b); } // But endpoint is still open. @@ -451,7 +467,6 @@ public class SelectChannelEndPointTest assertFalse(_lastEndp.isOpen()); } - @Test public void testStress() throws Exception { @@ -464,8 +479,8 @@ public class SelectChannelEndPointTest _manager.accept(server); final int writes = 200000; - final byte[] bytes="HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); - byte[] count="0\n".getBytes(StringUtil.__UTF8_CHARSET); + final byte[] bytes = "HelloWorld-".getBytes(StringUtil.__UTF8_CHARSET); + byte[] count = "0\n".getBytes(StringUtil.__UTF8_CHARSET); BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream()); final CountDownLatch latch = new CountDownLatch(writes); final InputStream in = new BufferedInputStream(client.getInputStream()); @@ -474,7 +489,7 @@ public class SelectChannelEndPointTest out.write(count); out.flush(); - while (_lastEndp==null) + while (_lastEndp == null) Thread.sleep(10); _lastEndp.setIdleTimeout(5000); @@ -484,28 +499,28 @@ public class SelectChannelEndPointTest public void run() { Thread.currentThread().setPriority(MAX_PRIORITY); - long last=-1; - int count=-1; + long last = -1; + int count = -1; try { - while (latch.getCount()>0) + while (latch.getCount() > 0) { // Verify echo server to client for (byte b0 : bytes) { int b = in.read(); - Assert.assertThat(b,greaterThan(0)); - assertEquals(0xff&b0,b); + Assert.assertThat(b, greaterThan(0)); + assertEquals(0xff & b0, b); } - count=0; - int b=in.read(); - while(b>0 && b!='\n') + count = 0; + int b = in.read(); + while (b > 0 && b != '\n') { - count=count*10+(b-'0'); - b=in.read(); + count = count * 10 + (b - '0'); + b = in.read(); } - last=System.currentTimeMillis(); + last = System.currentTimeMillis(); //if (latch.getCount()%1000==0) // System.out.println(writes-latch.getCount()); @@ -513,16 +528,16 @@ public class SelectChannelEndPointTest latch.countDown(); } } - catch(Throwable e) + catch (Throwable e) { long now = System.currentTimeMillis(); - System.err.println("count="+count); - System.err.println("latch="+latch.getCount()); - System.err.println("time="+(now-start)); - System.err.println("last="+(now-last)); - System.err.println("endp="+_lastEndp); - System.err.println("conn="+_lastEndp.getAsyncConnection()); + System.err.println("count=" + count); + System.err.println("latch=" + latch.getCount()); + System.err.println("time=" + (now - start)); + System.err.println("last=" + (now - last)); + System.err.println("endp=" + _lastEndp); + System.err.println("conn=" + _lastEndp.getAsyncConnection()); e.printStackTrace(); } @@ -530,12 +545,12 @@ public class SelectChannelEndPointTest }.start(); // Write client to server - for (int i=1;i0); - assertEquals("test-"+i+"/"+j,c,(char)b); + assertTrue(b > 0); + assertEquals("test-" + i + "/" + j, c, (char)b); } - if (i==0) + if (i == 0) _lastEndp.setIdleTimeout(60000); } client.close(); - int i=0; + int i = 0; while (server.isOpen()) { - assert(i++<10); + assert (i++ < 10); Thread.sleep(10); } - } - } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java index 681c4297d65..93811256c9b 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java @@ -10,6 +10,8 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLSocket; @@ -36,6 +38,7 @@ public class SslConnectionTest protected volatile AsyncEndPoint _lastEndp; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); + protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor(); protected SelectorManager _manager = new SelectorManager() { @Override @@ -61,7 +64,7 @@ public class SslConnectionTest @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { - SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, 60000); + SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, _scheduler, 60000); _lastEndp=endp; return endp; } diff --git a/jetty-rewrite/src/test/resources/org.mortbay.jetty.rewrite.handler/jetty-rewrite.xml b/jetty-rewrite/src/test/resources/org.mortbay.jetty.rewrite.handler/jetty-rewrite.xml index b06d0383699..f5b400133a2 100644 --- a/jetty-rewrite/src/test/resources/org.mortbay.jetty.rewrite.handler/jetty-rewrite.xml +++ b/jetty-rewrite/src/test/resources/org.mortbay.jetty.rewrite.handler/jetty-rewrite.xml @@ -194,7 +194,7 @@ - + @@ -217,7 +217,7 @@ - + diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index dd6065fcc44..425c0b6e8d5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -36,103 +36,77 @@ import org.eclipse.jetty.util.log.Logger; *

  • Base acceptor thread
  • *
  • Optional reverse proxy headers checking
  • * - * - * */ public abstract class AbstractConnector extends AggregateLifeCycle implements Connector, Dumpable { - static final Logger LOG = Log.getLogger(AbstractConnector.class); - - private final Thread[] _acceptors; - - private String _name; - private Server _server; - private Executor _executor; - private String _host; - private int _port = 0; - private int _acceptQueueSize = 0; - private int _acceptorPriorityOffset = 0; - private boolean _reuseAddress = true; - private ByteBufferPool _byteBufferPool=new StandardByteBufferPool(); // TODO should this be server wide? or a thread local one? + protected final Logger LOG = Log.getLogger(getClass()); private final Statistics _stats = new ConnectionStatistics(); + private final Thread[] _acceptors; + private volatile String _name; + private volatile Server _server; + private volatile Executor _executor; + private volatile int _acceptQueueSize = 128; + private volatile boolean _reuseAddress = true; + private volatile ByteBufferPool _byteBufferPool; + private volatile long _idleTimeout = 200000; + private volatile int _soLingerTime = -1; - protected long _idleTimeout = 200000; - protected int _soLingerTime = -1; - - - - /* ------------------------------------------------------------ */ - /** - */ public AbstractConnector() { - this(Math.max(1,(Runtime.getRuntime().availableProcessors())/4)); + this(Math.max(1, (Runtime.getRuntime().availableProcessors()) / 4)); } - /* ------------------------------------------------------------ */ - /** - */ - public AbstractConnector(@Name(value="acceptors") int acceptors) + public AbstractConnector(@Name("acceptors") int acceptors) { if (acceptors > 2 * Runtime.getRuntime().availableProcessors()) - LOG.warn("Acceptors should be <=2*availableProcessors: " + this); - _acceptors=new Thread[acceptors]; + LOG.warn("Acceptors should be <= 2*availableProcessors: " + this); + _acceptors = new Thread[acceptors]; } - /* ------------------------------------------------------------ */ @Override public Statistics getStatistics() { return _stats; } - /* ------------------------------------------------------------ */ - /* - */ @Override public Server getServer() { return _server; } - /* ------------------------------------------------------------ */ public void setServer(Server server) { _server = server; } - /* ------------------------------------------------------------ */ public Executor findExecutor() { - if (_executor==null && getServer()!=null) + if (_executor == null && getServer() != null) return getServer().getThreadPool(); return _executor; } - /* ------------------------------------------------------------ */ @Override public Executor getExecutor() { return _executor; } - /* ------------------------------------------------------------ */ public void setExecutor(Executor executor) { removeBean(_executor); - _executor=executor; + _executor = executor; addBean(_executor); } - /* ------------------------------------------------------------ */ @Override public ByteBufferPool getByteBufferPool() { return _byteBufferPool; } - /* ------------------------------------------------------------ */ public void setByteBufferPool(ByteBufferPool byteBufferPool) { removeBean(byteBufferPool); @@ -140,53 +114,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co addBean(_byteBufferPool); } - /* ------------------------------------------------------------ */ - public void setHost(String host) - { - if (this instanceof NetConnector) - _host = host; - else - throw new UnsupportedOperationException(); - } - - /* ------------------------------------------------------------ */ - public String getHost() - { - return _host; - } - - /* ------------------------------------------------------------ */ - public void setPort(int port) - { - if (this instanceof NetConnector) - _port = port; - else - throw new UnsupportedOperationException(); - } - - /* ------------------------------------------------------------ */ - public int getPort() - { - return _port; - } - - /* ------------------------------------------------------------ */ - public void open() throws IOException - { - } - - /* ------------------------------------------------------------ */ - public void close() throws IOException - { - } - - /* ------------------------------------------------------------ */ - public int getLocalPort() - { - return -1; - } - - /* ------------------------------------------------------------ */ /** * @return Returns the maxIdleTime. */ @@ -196,7 +123,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co return _idleTimeout; } - /* ------------------------------------------------------------ */ /** * Set the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)} call, although with NIO implementations * other mechanisms may be used to implement the timeout. The max idle time is applied: @@ -209,19 +135,17 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co * timeout (if implemented by jetty) is reset. However, in many instances, the reading/writing is delegated to the JVM, and the semantic is more strictly * enforced as the maximum time a single read/write operation can take. Note, that as Jetty supports writes of memory mapped file buffers, then a write may * take many 10s of seconds for large content written to a slow device. - *

    + *

    * Previously, Jetty supported separate idle timeouts and IO operation timeouts, however the expense of changing the value of soTimeout was significant, so * these timeouts were merged. With the advent of NIO, it may be possible to again differentiate these values (if there is demand). * - * @param idleTimeout - * The idleTimeout to set. + * @param idleTimeout The idleTimeout to set. */ public void setIdleTimeout(long idleTimeout) { _idleTimeout = idleTimeout; } - /* ------------------------------------------------------------ */ /** * @return Returns the soLingerTime. */ @@ -230,7 +154,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co return _soLingerTime; } - /* ------------------------------------------------------------ */ /** * @return Returns the acceptQueueSize. */ @@ -239,17 +162,14 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co return _acceptQueueSize; } - /* ------------------------------------------------------------ */ /** - * @param acceptQueueSize - * The acceptQueueSize to set. + * @param acceptQueueSize The acceptQueueSize to set. */ public void setAcceptQueueSize(int acceptQueueSize) { _acceptQueueSize = acceptQueueSize; } - /* ------------------------------------------------------------ */ /** * @return Returns the number of acceptor threads. */ @@ -259,30 +179,21 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co } - /* ------------------------------------------------------------ */ /** - * @param soLingerTime - * The soLingerTime to set or -1 to disable. + * @param soLingerTime The soLingerTime to set or -1 to disable. */ public void setSoLingerTime(int soLingerTime) { _soLingerTime = soLingerTime; } - /* ------------------------------------------------------------ */ @Override protected void doStart() throws Exception { if (_server == null) throw new IllegalStateException("No server"); - if (_name==null) - _name = (getHost() == null?"0.0.0.0":getHost()) + ":" + getPort(); - - // open listener port - open(); - - _name=_name+"/"+getLocalPort(); + _byteBufferPool = new StandardByteBufferPool(); super.doStart(); @@ -293,22 +204,12 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co findExecutor().execute(new Acceptor(i)); } - LOG.info("Started {}",this); + LOG.info("Started {}", this); } - /* ------------------------------------------------------------ */ @Override protected void doStop() throws Exception { - try - { - close(); - } - catch (IOException e) - { - LOG.warn(e); - } - super.doStop(); for (Thread thread : _acceptors) @@ -317,12 +218,11 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co thread.interrupt(); } - int i=_name.lastIndexOf("/"); - if (i>0) - _name=_name.substring(0,i); + int i = _name.lastIndexOf("/"); + if (i > 0) + _name = _name.substring(0, i); } - /* ------------------------------------------------------------ */ public void join() throws InterruptedException { for (Thread thread : _acceptors) @@ -330,16 +230,15 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co thread.join(); } - /* ------------------------------------------------------------ */ protected void configure(Socket socket) { try { socket.setTcpNoDelay(true); if (_soLingerTime >= 0) - socket.setSoLinger(true,_soLingerTime / 1000); + socket.setSoLinger(true, _soLingerTime / 1000); else - socket.setSoLinger(false,0); + socket.setSoLinger(false, 0); } catch (Exception e) { @@ -347,21 +246,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co } } - /* ------------------------------------------------------------ */ protected abstract void accept(int acceptorID) throws IOException, InterruptedException; - /* ------------------------------------------------------------ */ - @Override - public String toString() - { - return String.format("%s@%s:%d", - getClass().getSimpleName(), - getHost()==null?"0.0.0.0":getHost(), - getLocalPort()<=0?getPort():getLocalPort()); - } - - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ private class Acceptor implements Runnable { @@ -372,7 +258,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co _acceptor = id; } - /* ------------------------------------------------------------ */ @Override public void run() { @@ -391,7 +276,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co try { - current.setPriority(old_priority - _acceptorPriorityOffset); + current.setPriority(old_priority); while (isRunning() && getTransport() != null) { try @@ -421,43 +306,37 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co } } - /* ------------------------------------------------------------ */ @Override public String getName() { return _name; } - /* ------------------------------------------------------------ */ public void setName(String name) { _name = name; } - /* ------------------------------------------------------------ */ protected void connectionOpened(AsyncConnection connection) { _stats.connectionOpened(); } - /* ------------------------------------------------------------ */ protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection) { long duration = System.currentTimeMillis() - oldConnection.getEndPoint().getCreatedTimeStamp(); - int requests = (oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getHttpChannel().getRequests():0; - _stats.connectionUpgraded(duration,requests,requests); + int requests = (oldConnection instanceof HttpConnection) ? ((HttpConnection)oldConnection).getHttpChannel().getRequests() : 0; + _stats.connectionUpgraded(duration, requests, requests); } - /* ------------------------------------------------------------ */ protected void connectionClosed(AsyncConnection connection) { long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp(); // TODO: remove casts to HttpConnection - int requests = (connection instanceof HttpConnection)?((HttpConnection)connection).getHttpChannel().getRequests():0; - _stats.connectionClosed(duration,requests,requests); + int requests = (connection instanceof HttpConnection) ? ((HttpConnection)connection).getHttpChannel().getRequests() : 0; + _stats.connectionClosed(duration, requests, requests); } - /* ------------------------------------------------------------ */ /** * @return True if the the server socket will be opened in SO_REUSEADDR mode. */ @@ -466,10 +345,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co return _reuseAddress; } - /* ------------------------------------------------------------ */ /** - * @param reuseAddress - * True if the the server socket will be opened in SO_REUSEADDR mode. + * @param reuseAddress True if the the server socket will be opened in SO_REUSEADDR mode. */ public void setReuseAddress(boolean reuseAddress) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetConnector.java new file mode 100644 index 00000000000..01f905bd91c --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractNetConnector.java @@ -0,0 +1,100 @@ +// ======================================================================== +// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.server; + +import java.io.IOException; + +public abstract class AbstractNetConnector extends AbstractConnector implements Connector.NetConnector +{ + private volatile String _host; + private volatile int _port = 0; + + protected AbstractNetConnector() + { + } + + protected AbstractNetConnector(int acceptors) + { + super(acceptors); + } + + public void setHost(String host) + { + _host = host; + } + + public String getHost() + { + return _host; + } + + public void setPort(int port) + { + _port = port; + } + + public int getPort() + { + return _port; + } + + public int getLocalPort() + { + return -1; + } + + @Override + protected void doStart() throws Exception + { + if (getName() == null) + setName(getHost() == null ? "0.0.0.0" : getHost() + ":" + getPort()); + + open(); + + setName(getName() + "/" + getLocalPort()); + + super.doStart(); + } + + @Override + protected void doStop() throws Exception + { + try + { + close(); + } + catch (IOException e) + { + LOG.warn(e); + } + super.doStop(); + } + + public void open() throws IOException + { + } + + public void close() throws IOException + { + } + + @Override + public String toString() + { + return String.format("%s@%s:%d", + getClass().getSimpleName(), + getHost() == null ? "0.0.0.0" : getHost(), + getLocalPort() <= 0 ? getPort() : getLocalPort()); + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java index ea5468819a8..000dbad3252 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Connector.java @@ -65,7 +65,7 @@ public interface Connector extends LifeCycle /* ------------------------------------------------------------ */ Statistics getStatistics(); - interface NetConnector extends Connector + interface NetConnector extends Connector, AutoCloseable { /* ------------------------------------------------------------ */ /** @@ -75,7 +75,7 @@ public interface Connector extends LifeCycle void open() throws IOException; /* ------------------------------------------------------------ */ - void close(); + void close() throws IOException; /* ------------------------------------------------------------ */ /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 4df499fe9c4..5807f5fa148 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -18,8 +18,7 @@ import java.io.InputStream; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.Timer; - +import java.util.concurrent.ScheduledExecutorService; import javax.servlet.DispatcherType; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; @@ -55,7 +54,7 @@ public abstract class HttpChannel { static final Logger LOG = Log.getLogger(HttpChannel.class); - private static final ThreadLocal __currentChannel = new ThreadLocal(); + private static final ThreadLocal __currentChannel = new ThreadLocal<>(); /* ------------------------------------------------------------ */ public static HttpChannel getCurrentHttpChannel() @@ -68,8 +67,8 @@ public abstract class HttpChannel { __currentChannel.set(channel); } - - + + private final Server _server; private final AsyncConnection _connection; @@ -77,7 +76,7 @@ public abstract class HttpChannel private final ChannelEventHandler _handler = new ChannelEventHandler(); private final HttpChannelState _state; - + private final HttpFields _requestFields; private final Request _request; private final HttpInput _in; @@ -90,19 +89,16 @@ public abstract class HttpChannel private int _requests; private int _include; - + private HttpVersion _version = HttpVersion.HTTP_1_1; private boolean _expect = false; private boolean _expect100Continue = false; private boolean _expect102Processing = false; private boolean _host = false; - - + + /* ------------------------------------------------------------ */ - /** Constructor - * - */ public HttpChannel(Server server,AsyncConnection connection,HttpInput input) { _server = server; @@ -116,31 +112,31 @@ public abstract class HttpChannel _in=input; _out=new Output(); } - + /* ------------------------------------------------------------ */ public HttpChannelState getState() { return _state; } - + /* ------------------------------------------------------------ */ public EventHandler getEventHandler() { return _handler; } - + /* ------------------------------------------------------------ */ public AsyncEndPoint getEndPoint() { return getConnection().getEndPoint(); } - + /* ------------------------------------------------------------ */ public boolean isIdle() { return _state.isIdle(); } - + /* ------------------------------------------------------------ */ /** * @return the number of requests handled by this connection @@ -197,7 +193,7 @@ public abstract class HttpChannel { return _connection; } - + /* ------------------------------------------------------------ */ public InetSocketAddress getLocalAddress() { @@ -209,7 +205,7 @@ public abstract class HttpChannel { return _connection.getEndPoint().getRemoteAddress(); } - + /* ------------------------------------------------------------ */ /** * Get the inputStream from the connection. @@ -220,6 +216,7 @@ public abstract class HttpChannel * * @return The input stream for this connection. * The stream will be created if it does not already exist. + * @throws IOException if the InputStream cannot be created */ public ServletInputStream getInputStream() throws IOException { @@ -251,6 +248,7 @@ public abstract class HttpChannel /* ------------------------------------------------------------ */ /** + * @param charset the character set for the PrintWriter * @return A {@link PrintWriter} wrapping the {@link #getOutputStream output stream}. The writer is created if it * does not already exist. */ @@ -319,7 +317,7 @@ public abstract class HttpChannel // Loop here to handle async request redispatches. // The loop is controlled by the call to async.unhandle in the // finally block below. Unhandle will return false only if an async dispatch has - // already happened when unhandle is called. + // already happened when unhandle is called. boolean handling=_state.handling(); while(handling && getServer().isRunning()) @@ -328,7 +326,7 @@ public abstract class HttpChannel { _request.setHandled(false); _out.reopen(); - + if (_state.isInitial()) { _request.setDispatcherType(DispatcherType.REQUEST); @@ -340,7 +338,7 @@ public abstract class HttpChannel _request.setDispatcherType(DispatcherType.ASYNC); getServer().handleAsync(this); } - + } catch (ContinuationThrowable e) { @@ -377,7 +375,7 @@ public abstract class HttpChannel __currentChannel.set(null); if (threadName!=null) Thread.currentThread().setName(threadName); - + if (_state.isCompleting()) { try @@ -401,7 +399,7 @@ public abstract class HttpChannel // Complete generating the response _response.complete(); - + // Complete reading the request _in.consumeAll(); } @@ -420,7 +418,7 @@ public abstract class HttpChannel completed(); } } - + LOG.debug("{} !process",this); } } @@ -429,10 +427,10 @@ public abstract class HttpChannel protected boolean commitError(final int status, final String reason, String content) { LOG.debug("{} sendError {} {}",this,status,reason); - + if (_response.isCommitted()) return false; - + try { _response.setStatus(status,reason); @@ -447,7 +445,7 @@ public abstract class HttpChannel HttpGenerator.ResponseInfo info = _handler.commit(); commit(info,buffer); - + return true; } catch(Exception e) @@ -481,11 +479,8 @@ public abstract class HttpChannel _include--; _out.reopen(); } - + /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.io.AsyncConnection#isSuspended() - */ public boolean isSuspended() { return _request.getAsyncContinuation().isSuspended(); @@ -508,7 +503,7 @@ public abstract class HttpChannel { return _expect102Processing; } - + /* ------------------------------------------------------------ */ @Override public String toString() @@ -536,7 +531,7 @@ public abstract class HttpChannel if(_request.getTimeStamp()==0) _request.setTimeStamp(System.currentTimeMillis()); _request.setMethod(httpMethod,method); - + if (httpMethod==HttpMethod.CONNECT) _uri.parseConnect(uri); else @@ -545,7 +540,7 @@ public abstract class HttpChannel _request.setPathInfo(_uri.getDecodedPath()); _version=version==null?HttpVersion.HTTP_0_9:version; _request.setHttpVersion(_version); - + return false; } @@ -576,7 +571,7 @@ public abstract class HttpChannel break; default: - String[] values = value.toString().split(","); + String[] values = value.split(","); for (int i=0;values!=null && i 0) _responseFields.putLongField(HttpHeader.CONTENT_LENGTH, httpContent.getContentLength()); - + String lm = httpContent.getLastModified(); if (lm != null) _responseFields.put(HttpHeader.LAST_MODIFIED, lm); @@ -794,40 +789,34 @@ public abstract class HttpChannel throw new IllegalArgumentException("unknown content type?"); } } - + public abstract HttpConnector getHttpConnector(); - + protected abstract int write(ByteBuffer content) throws IOException; - + /* Called by the channel or application to commit a specific response info */ protected abstract void commit(ResponseInfo info, ByteBuffer content) throws IOException; - + protected abstract int getContentBufferSize(); protected abstract void increaseContentBufferSize(int size); protected abstract void resetBuffer(); - + protected abstract void flushResponse() throws IOException; protected abstract void completeResponse() throws IOException; protected abstract void completed(); - - protected abstract void execute(Runnable task); - - // TODO replace with ScheduledExecutorService? - // TODO constructor inject - public abstract Timer getTimer(); - + protected abstract void execute(Runnable task); + + // TODO use constructor injection ? + public abstract ScheduledExecutorService getScheduler(); /* ------------------------------------------------------------ */ public interface EventHandler extends HttpParser.RequestHandler { ResponseInfo commit(); } - - - } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index a8687b7cbcd..2e04c2df191 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -4,20 +4,20 @@ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at +// The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. +// You may elect to redistribute this code under either of these licenses. // ======================================================================== package org.eclipse.jetty.server; import java.util.ArrayList; import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; @@ -39,32 +39,32 @@ import org.eclipse.jetty.util.log.Logger; /* ------------------------------------------------------------ */ /** Implementation of Continuation and AsyncContext interfaces - * + * */ public class HttpChannelState implements AsyncContext, Continuation { private static final Logger LOG = Log.getLogger(HttpChannelState.class); private final static long DEFAULT_TIMEOUT=30000L; - + private final static ContinuationThrowable __exception = new ContinuationThrowable(); - + // STATES: // handling() suspend() unhandle() resume() complete() completed() - // startAsync() dispatch() + // startAsync() dispatch() // IDLE DISPATCHED COMPLETECALLED // DISPATCHED ASYNCSTARTED COMPLETING // ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETECALLED - // REDISPATCHING REDISPATCHED + // REDISPATCHING REDISPATCHED // ASYNCWAIT REDISPATCH COMPLETECALLED // REDISPATCH REDISPATCHED // REDISPATCHED ASYNCSTARTED COMPLETING // COMPLETECALLED COMPLETING COMPLETING // COMPLETING COMPLETING COMPLETED - // COMPLETED + // COMPLETED - public enum State - { + public enum State + { IDLE, // Idle request DISPATCHED, // Request dispatched to filter/servlet ASYNCSTARTED, // Suspend called, but not yet returned to container @@ -75,8 +75,8 @@ public class HttpChannelState implements AsyncContext, Continuation COMPLETECALLED,// complete called COMPLETING, // Request is completable COMPLETED // Request is complete - }; - + } + /* ------------------------------------------------------------ */ private final HttpChannel _channel; private List _lastAsyncListeners; @@ -90,9 +90,9 @@ public class HttpChannelState implements AsyncContext, Continuation private boolean _expired; private volatile boolean _responseWrapped; private long _timeoutMs=DEFAULT_TIMEOUT; - private AsyncEventState _event; + private AsyncEventState _event; private volatile boolean _continuation; - + /* ------------------------------------------------------------ */ protected HttpChannelState(HttpChannel channel) { @@ -103,13 +103,13 @@ public class HttpChannelState implements AsyncContext, Continuation /* ------------------------------------------------------------ */ public State getState() - { + { synchronized(this) { return _state; } } - + /* ------------------------------------------------------------ */ @Override public void addListener(AsyncListener listener) @@ -117,7 +117,7 @@ public class HttpChannelState implements AsyncContext, Continuation synchronized(this) { if (_asyncListeners==null) - _asyncListeners=new ArrayList(); + _asyncListeners=new ArrayList<>(); _asyncListeners.add(listener); } } @@ -130,7 +130,7 @@ public class HttpChannelState implements AsyncContext, Continuation { // TODO handle the request/response ??? if (_asyncListeners==null) - _asyncListeners=new ArrayList(); + _asyncListeners=new ArrayList<>(); _asyncListeners.add(listener); } } @@ -142,7 +142,7 @@ public class HttpChannelState implements AsyncContext, Continuation synchronized(this) { if (_continuationListeners==null) - _continuationListeners=new ArrayList(); + _continuationListeners=new ArrayList<>(); _continuationListeners.add(listener); } } @@ -155,7 +155,7 @@ public class HttpChannelState implements AsyncContext, Continuation { _timeoutMs=ms; } - } + } /* ------------------------------------------------------------ */ @Override @@ -165,7 +165,7 @@ public class HttpChannelState implements AsyncContext, Continuation { return _timeoutMs; } - } + } /* ------------------------------------------------------------ */ public AsyncEventState getAsyncEventState() @@ -174,8 +174,8 @@ public class HttpChannelState implements AsyncContext, Continuation { return _event; } - } - + } + /* ------------------------------------------------------------ */ /** * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped() @@ -198,7 +198,7 @@ public class HttpChannelState implements AsyncContext, Continuation return _initial; } } - + /* ------------------------------------------------------------ */ /* (non-Javadoc) * @see javax.servlet.ServletRequest#isSuspended() @@ -215,9 +215,9 @@ public class HttpChannelState implements AsyncContext, Continuation case COMPLETECALLED: case ASYNCWAIT: return true; - + default: - return false; + return false; } } } @@ -230,7 +230,7 @@ public class HttpChannelState implements AsyncContext, Continuation return _state==State.IDLE; } } - + /* ------------------------------------------------------------ */ public boolean isSuspending() { @@ -241,13 +241,13 @@ public class HttpChannelState implements AsyncContext, Continuation case ASYNCSTARTED: case ASYNCWAIT: return true; - + default: - return false; + return false; } } } - + /* ------------------------------------------------------------ */ public boolean isDispatchable() { @@ -260,9 +260,9 @@ public class HttpChannelState implements AsyncContext, Continuation case REDISPATCHING: case COMPLETECALLED: return true; - + default: - return false; + return false; } } } @@ -299,7 +299,7 @@ public class HttpChannelState implements AsyncContext, Continuation { _continuation=false; _responseWrapped=false; - + switch(_state) { case IDLE: @@ -315,7 +315,7 @@ public class HttpChannelState implements AsyncContext, Continuation _lastAsyncListeners=null; } return true; - + case COMPLETECALLED: _state=State.COMPLETING; return false; @@ -323,7 +323,7 @@ public class HttpChannelState implements AsyncContext, Continuation case COMPLETING: case ASYNCWAIT: return false; - + case REDISPATCH: _state=State.REDISPATCHED; return true; @@ -370,7 +370,7 @@ public class HttpChannelState implements AsyncContext, Continuation throw new IllegalStateException(this.getStatusString()); } } - + if (_lastAsyncListeners!=null) { for (AsyncListener listener : _lastAsyncListeners) @@ -397,13 +397,13 @@ public class HttpChannelState implements AsyncContext, Continuation _event._cause=th; } } - + /* ------------------------------------------------------------ */ /** * Signal that the HttpConnection has finished handling the request. * For blocking connectors, this call may block if the request has * been suspended (startAsync called). - * @return true if handling is complete, false if the request should + * @return true if handling is complete, false if the request should * be handled again (eg because of a resume that happened before unhandle was called) */ protected boolean unhandle() @@ -423,22 +423,22 @@ public class HttpChannelState implements AsyncContext, Continuation case ASYNCSTARTED: _initial=false; _state=State.ASYNCWAIT; - scheduleTimeout(); + scheduleTimeout(); if (_state==State.ASYNCWAIT) return true; else if (_state==State.COMPLETECALLED) { _state=State.COMPLETING; return true; - } + } _initial=false; _state=State.REDISPATCHED; - return false; + return false; case REDISPATCHING: _initial=false; _state=State.REDISPATCHED; - return false; + return false; case COMPLETECALLED: _initial=false; @@ -470,15 +470,15 @@ public class HttpChannelState implements AsyncContext, Continuation _state=State.REDISPATCH; _resumed=true; break; - + case REDISPATCH: return; - + default: throw new IllegalStateException(this.getStatusString()); } } - + if (dispatch) { cancelTimeout(); @@ -507,7 +507,7 @@ public class HttpChannelState implements AsyncContext, Continuation } _expired=true; } - + if (aListeners!=null) { for (AsyncListener listener : aListeners) @@ -536,16 +536,16 @@ public class HttpChannelState implements AsyncContext, Continuation } } } - - - + + + synchronized (this) { switch(_state) { case ASYNCSTARTED: case ASYNCWAIT: - if (_continuation) + if (_continuation) dispatch(); else // TODO maybe error dispatch? @@ -555,7 +555,7 @@ public class HttpChannelState implements AsyncContext, Continuation scheduleDispatch(); } - + /* ------------------------------------------------------------ */ /* (non-Javadoc) * @see javax.servlet.ServletRequest#complete() @@ -577,17 +577,17 @@ public class HttpChannelState implements AsyncContext, Continuation case ASYNCSTARTED: _state=State.COMPLETECALLED; return; - + case ASYNCWAIT: _state=State.COMPLETECALLED; dispatch=!_expired; break; - + default: throw new IllegalStateException(this.getStatusString()); } } - + if (dispatch) { cancelTimeout(); @@ -597,7 +597,7 @@ public class HttpChannelState implements AsyncContext, Continuation /* ------------------------------------------------------------ */ @Override - public T createListener(Class clazz) throws ServletException + public T createListener(Class clazz) throws ServletException { try { @@ -628,14 +628,14 @@ public class HttpChannelState implements AsyncContext, Continuation cListeners=_continuationListeners; aListeners=_asyncListeners; break; - + default: cListeners=null; aListeners=null; throw new IllegalStateException(this.getStatusString()); } } - + if (aListeners!=null) { for (AsyncListener listener : aListeners) @@ -696,8 +696,8 @@ public class HttpChannelState implements AsyncContext, Continuation if (_event!=null) _event._cause=null; } - } - + } + /* ------------------------------------------------------------ */ public void cancel() { @@ -717,12 +717,9 @@ public class HttpChannelState implements AsyncContext, Continuation /* ------------------------------------------------------------ */ protected void scheduleTimeout() { - Timer timer = _channel.getTimer(); - if (timer!=null) - { - _event._timeout= new AsyncTimeout(); - timer.schedule(_event._timeout,_timeoutMs); - } + ScheduledExecutorService scheduler = _channel.getScheduler(); + if (scheduler!=null) + _event._timeout=scheduler.schedule(new AsyncTimeout(),_timeoutMs,TimeUnit.MILLISECONDS); } /* ------------------------------------------------------------ */ @@ -731,9 +728,9 @@ public class HttpChannelState implements AsyncContext, Continuation AsyncEventState event=_event; if (event!=null) { - TimerTask task=event._timeout; + Future task=event._timeout; if (task!=null) - task.cancel(); + task.cancel(false); } } @@ -745,7 +742,7 @@ public class HttpChannelState implements AsyncContext, Continuation return _state==State.COMPLETECALLED; } } - + /* ------------------------------------------------------------ */ boolean isCompleting() { @@ -753,8 +750,8 @@ public class HttpChannelState implements AsyncContext, Continuation { return _state==State.COMPLETING; } - } - + } + /* ------------------------------------------------------------ */ public boolean isCompleted() { @@ -826,7 +823,7 @@ public class HttpChannelState implements AsyncContext, Continuation { return _channel.getRequest(); } - + /* ------------------------------------------------------------ */ @Override public ServletRequest getRequest() @@ -917,7 +914,7 @@ public class HttpChannelState implements AsyncContext, Continuation { dispatch(); } - + /* ------------------------------------------------------------ */ @@ -936,7 +933,7 @@ public class HttpChannelState implements AsyncContext, Continuation } } - + /* ------------------------------------------------------------ */ /** * @see Continuation#suspend() @@ -946,7 +943,7 @@ public class HttpChannelState implements AsyncContext, Continuation { _continuation=true; _responseWrapped=!(response instanceof Response); - doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response); + doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response); } /* ------------------------------------------------------------ */ @@ -958,7 +955,7 @@ public class HttpChannelState implements AsyncContext, Continuation { _responseWrapped=false; _continuation=true; - doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse()); + doSuspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse()); } /* ------------------------------------------------------------ */ @@ -1022,7 +1019,7 @@ public class HttpChannelState implements AsyncContext, Continuation /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - public class AsyncTimeout extends TimerTask + public class AsyncTimeout implements Runnable { @Override public void run() @@ -1035,25 +1032,25 @@ public class HttpChannelState implements AsyncContext, Continuation /* ------------------------------------------------------------ */ public class AsyncEventState extends AsyncEvent { - private TimerTask _timeout; + private Future _timeout; private final ServletContext _suspendedContext; private ServletContext _dispatchContext; private String _pathInContext; private Throwable _cause; - + public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response) { super(HttpChannelState.this, request,response); _suspendedContext=context; // Get the base request So we can remember the initial paths Request r=_channel.getRequest(); - + // If we haven't been async dispatched before if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null) { - // We are setting these attributes during startAsync, when the spec implies that + // We are setting these attributes during startAsync, when the spec implies that // they are only available after a call to AsyncContext.dispatch(...); - + // have we been forwarded before? String uri=(String)r.getAttribute(Dispatcher.FORWARD_REQUEST_URI); if (uri!=null) @@ -1074,22 +1071,22 @@ public class HttpChannelState implements AsyncContext, Continuation } } } - + public ServletContext getSuspendedContext() { return _suspendedContext; } - + public ServletContext getDispatchContext() { return _dispatchContext; } - + public ServletContext getServletContext() { return _dispatchContext==null?_suspendedContext:_dispatchContext; } - + /* ------------------------------------------------------------ */ /** * @return The path in the context @@ -1099,11 +1096,11 @@ public class HttpChannelState implements AsyncContext, Continuation return _pathInContext; } } - + private final Runnable _handleRequest = new Runnable() { @Override - public void run() + public void run() { _channel.handle(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index ad29478c75d..80952725750 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -15,9 +15,9 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Timer; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.http.HttpGenerator; import org.eclipse.jetty.http.HttpGenerator.Action; @@ -41,7 +41,7 @@ public class HttpConnection extends AbstractAsyncConnection { public static final Logger LOG = Log.getLogger(HttpConnection.class); - private static final ThreadLocal __currentConnection = new ThreadLocal(); + private static final ThreadLocal __currentConnection = new ThreadLocal<>(); public static final String UPGRADE_CONNECTION_ATTR = "org.eclispe.jetty.server.HttpConnection.UPGRADE"; @@ -76,17 +76,12 @@ public class HttpConnection extends AbstractAsyncConnection } /* ------------------------------------------------------------ */ - /** Constructor - * - */ public HttpConnection(HttpConnector connector, AsyncEndPoint endpoint, Server server) { super(endpoint,connector.findExecutor()); _connector = connector; _bufferPool=_connector.getByteBufferPool(); - if (_bufferPool==null) - new Throwable().printStackTrace(); _server = server; @@ -696,9 +691,9 @@ public class HttpConnection extends AbstractAsyncConnection } @Override - public Timer getTimer() + public ScheduledExecutorService getScheduler() { - return _connector.getTimer(); + return _connector.getScheduler(); } @Override @@ -747,7 +742,7 @@ public class HttpConnection extends AbstractAsyncConnection return fcb; } - }; + } private class HttpHttpInput extends HttpInput { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java index 5728ccfe1df..7f1095f43e4 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java @@ -2,8 +2,9 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Timer; - +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import javax.servlet.ServletRequest; import org.eclipse.jetty.http.HttpFields; @@ -11,7 +12,7 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpScheme; -public abstract class HttpConnector extends AbstractConnector +public abstract class HttpConnector extends AbstractNetConnector { private String _integralScheme = HttpScheme.HTTPS.asString(); private int _integralPort = 0; @@ -19,52 +20,65 @@ public abstract class HttpConnector extends AbstractConnector private int _confidentialPort = 0; private boolean _forwarded; private String _hostHeader; - private Timer _timer = new Timer(true); - + private ScheduledExecutorService _scheduler; + private boolean _shutdownScheduler; private String _forwardedHostHeader = HttpHeader.X_FORWARDED_HOST.toString(); private String _forwardedServerHeader = HttpHeader.X_FORWARDED_SERVER.toString(); private String _forwardedForHeader = HttpHeader.X_FORWARDED_FOR.toString(); private String _forwardedProtoHeader = HttpHeader.X_FORWARDED_PROTO.toString(); private String _forwardedCipherSuiteHeader; private String _forwardedSslSessionIdHeader; - - private int _requestHeaderSize=6*1024;; + private int _requestHeaderSize=6*1024; private int _requestBufferSize=16*1024; private int _responseHeaderSize=6*1024; private int _responseBufferSize=16*1024; - public HttpConnector() { - super(); } public HttpConnector(int acceptors) { super(acceptors); } - + + public ScheduledExecutorService getScheduler() + { + return _scheduler; + } + + public void setScheduler(ScheduledExecutorService scheduler) + { + _scheduler = scheduler; + } + @Override protected void doStart() throws Exception { super.doStart(); - _timer=new Timer("Timer-"+getName(),true); + if (_scheduler == null) + { + _scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() + { + @Override + public Thread newThread(Runnable r) + { + return new Thread(r, "Timer-" + HttpConnector.this.getName()); + } + }); + _shutdownScheduler = true; + } } @Override protected void doStop() throws Exception { - if (_timer!=null) - _timer.cancel(); - _timer=null; + if (_shutdownScheduler) + _scheduler.shutdownNow(); + _scheduler = null; super.doStop(); } - public Timer getTimer() - { - return _timer; - } - public int getRequestHeaderSize() { return _requestHeaderSize; @@ -307,12 +321,11 @@ public abstract class HttpConnector extends AbstractConnector * Set reverse proxy handling. If set to true, then the X-Forwarded headers (or the headers set in their place) are looked for to set the request protocol, * host, server and client ip. * - * @param check - * true if this connector is checking the x-forwarded-for/host/server headers - * @set {@link #setForwardedForHeader(String)} - * @set {@link #setForwardedHostHeader(String)} - * @set {@link #setForwardedProtoHeader(String)} - * @set {@link #setForwardedServerHeader(String)} + * @param check true if this connector is checking the x-forwarded-for/host/server headers + * @see #setForwardedForHeader(String) + * @see #setForwardedHostHeader(String) + * @see #setForwardedProtoHeader(String) + * @see #setForwardedServerHeader(String) */ public void setForwarded(boolean check) { @@ -384,6 +397,7 @@ public abstract class HttpConnector extends AbstractConnector /* ------------------------------------------------------------ */ /** + * @return the forwarded for header * @see #setForwarded(boolean) */ public String getForwardedForHeader() @@ -464,6 +478,4 @@ public abstract class HttpConnector extends AbstractConnector { _forwardedSslSessionIdHeader = forwardedSslSessionId; } - - } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java index c6ee3d37e0f..67b68905beb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalHttpConnector.java @@ -15,14 +15,11 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Timer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.io.AsyncByteArrayEndPoint; @@ -34,9 +31,9 @@ import org.eclipse.jetty.util.log.Logger; public class LocalHttpConnector extends HttpConnector { private static final Logger LOG = Log.getLogger(LocalHttpConnector.class); - private final BlockingQueue _connects = new LinkedBlockingQueue(); - private ScheduledExecutorService _timer; - private LocalExecutor _executor; + + private final BlockingQueue _connects = new LinkedBlockingQueue<>(); + private volatile LocalExecutor _executor; public LocalHttpConnector() { @@ -49,12 +46,12 @@ public class LocalHttpConnector extends HttpConnector return this; } - /** Sends requests and get's responses based on thread activity. + /** Sends requests and get responses based on thread activity. * Returns all the responses received once the thread activity has * returned to the level it was before the requests. - * @param requests - * @return - * @throws Exception + * @param requests the requests + * @return the responses + * @throws Exception if the requests fail */ public String getResponses(String requests) throws Exception { @@ -65,6 +62,9 @@ public class LocalHttpConnector extends HttpConnector /** Sends requests and get's responses based on thread activity. * Returns all the responses received once the thread activity has * returned to the level it was before the requests. + * @param requestsBuffer the requests + * @return the responses + * @throws Exception if the requests fail */ public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception { @@ -81,8 +81,8 @@ public class LocalHttpConnector extends HttpConnector /** * Execute a request and return the EndPoint through which * responses can be received. - * @param rawRequest - * @return + * @param rawRequest the request + * @return the local endpoint */ public LocalEndPoint executeRequest(String rawRequest) { @@ -111,7 +111,6 @@ public class LocalHttpConnector extends HttpConnector protected void doStart() throws Exception { super.doStart(); - _timer=new ScheduledThreadPoolExecutor(1); _executor=new LocalExecutor(findExecutor()); } @@ -119,7 +118,6 @@ public class LocalHttpConnector extends HttpConnector protected void doStop() throws Exception { super.doStop(); - _timer.shutdownNow(); _executor=null; } @@ -175,9 +173,8 @@ public class LocalHttpConnector extends HttpConnector public LocalEndPoint() { - super(_timer); + super(getScheduler(), LocalHttpConnector.this.getIdleTimeout()); setGrowOutput(true); - setIdleTimeout(LocalHttpConnector.this.getIdleTimeout()); } public void addInput(String s) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java index 220a07943e4..b4ad24a6fe2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/SelectChannelConnector.java @@ -50,8 +50,6 @@ import org.eclipse.jetty.server.Connector.NetConnector; * thus if possible it should be read after the continuation or saved as a request attribute or as the * associated object of the Continuation instance. *

    - * - * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector" */ public class SelectChannelConnector extends HttpConnector implements NetConnector { @@ -96,22 +94,15 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto } @Override - public void close() + public void close() throws IOException { synchronized(this) { if (_acceptChannel != null) { removeBean(_acceptChannel); - try - { - if (_acceptChannel.isOpen()) - _acceptChannel.close(); - } - catch(IOException e) - { - LOG.warn(e); - } + if (_acceptChannel.isOpen()) + _acceptChannel.close(); } _acceptChannel = null; _localPort=-2; @@ -182,7 +173,7 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel,selectSet,key, this._idleTimeout); + return new SelectChannelEndPoint(channel,selectSet,key, getScheduler(), getIdleTimeout()); } protected void endPointClosed(AsyncEndPoint endpoint) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java index 9f9dc670055..7facf5e70d7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/NetworkTrafficSelectChannelConnector.java @@ -55,7 +55,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException { - NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getIdleTimeout(), listeners); + NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners); endPoint.notifyOpened(); return endPoint; } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java index d85393469f8..4e0ff6b2d9a 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpWriterTest.java @@ -12,11 +12,9 @@ package org.eclipse.jetty.server; //You may elect to redistribute this code under either of these licenses. //======================================================================== -import static org.junit.Assert.assertEquals; - import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Timer; +import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.http.HttpGenerator.ResponseInfo; import org.eclipse.jetty.util.BufferUtil; @@ -26,6 +24,8 @@ import org.eclipse.jetty.util.Utf8StringBuilder; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class HttpWriterTest { private HttpWriter _writer; @@ -94,13 +94,13 @@ public class HttpWriterTest } @Override - public Timer getTimer() + public ScheduledExecutorService getScheduler() { return null; } - + }; - + HttpOutput httpOut = new HttpOutput(channel); _writer = new HttpWriter(httpOut); } @@ -120,7 +120,7 @@ public class HttpWriterTest _writer.write("How now \uFF22rown cow"); assertArrayEquals("How now \uFF22rown cow".getBytes(StringUtil.__UTF8),BufferUtil.toArray(_bytes)); } - + @Test public void testNotCESU8() throws Exception { @@ -130,11 +130,11 @@ public class HttpWriterTest assertEquals("787878F0909080787878",TypeUtil.toHexString(BufferUtil.toArray(_bytes))); assertArrayEquals(data.getBytes(StringUtil.__UTF8),BufferUtil.toArray(_bytes)); assertEquals(3+4+3,_bytes.remaining()); - + Utf8StringBuilder buf = new Utf8StringBuilder(); buf.append(BufferUtil.toArray(_bytes),0,_bytes.remaining()); assertEquals(data,buf.toString()); - + } @Test @@ -203,7 +203,7 @@ public class HttpWriterTest final String singleByteStr = "a"; int remainSize = 1; - final String multiByteDuplicateStr = "\uD842\uDF9F"; + final String multiByteDuplicateStr = "\uD842\uDF9F"; int adjustSize = -1; StringBuilder sb = new StringBuilder(); @@ -233,7 +233,7 @@ public class HttpWriterTest assertArrayEquals(bytes,BufferUtil.toArray(_bytes)); assertArrayEquals(baos.toByteArray(),BufferUtil.toArray(_bytes)); } - + @Test public void testMultiByteOverflowUTF16x2_2() throws Exception { @@ -241,8 +241,8 @@ public class HttpWriterTest final String singleByteStr = "a"; int remainSize = 1; - final String multiByteDuplicateStr = "\uD842\uDF9F"; - int adjustSize = -2; + final String multiByteDuplicateStr = "\uD842\uDF9F"; + int adjustSize = -2; StringBuilder sb = new StringBuilder(); for (int i = 0; i < HttpWriter.MAX_OUTPUT_CHARS + adjustSize; i++) diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index f7a9c48a32c..286fb7454b3 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -13,11 +13,6 @@ package org.eclipse.jetty.server; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; @@ -26,11 +21,9 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Locale; -import java.util.Timer; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; - import javax.servlet.ServletException; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; @@ -52,6 +45,11 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * */ @@ -71,8 +69,8 @@ public class ResponseTest _server.setHandler(new DumpHandler()); _server.start(); _timer=new ScheduledThreadPoolExecutor(1); - - AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer); + + AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,5000); HttpInput input = new HttpInput(); AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor() { @@ -114,7 +112,7 @@ public class ResponseTest } @Override - public Timer getTimer() + public ScheduledExecutorService getScheduler() { // TODO Auto-generated method stub return null; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java index 41c87ac07bf..04b2f0b1876 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SelectChannelStatisticsTest.java @@ -13,9 +13,6 @@ package org.eclipse.jetty.server; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -23,7 +20,6 @@ import java.io.PrintWriter; import java.net.Socket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; - import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -40,12 +36,15 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class SelectChannelStatisticsTest { private static final Logger LOG = Log.getLogger(SelectChannelStatisticsTest.class); private static Server _server; - private static AbstractConnector _connector; + private static AbstractNetConnector _connector; private static CyclicBarrier _connect; private static CountDownLatch _closed; @@ -132,7 +131,7 @@ public class SelectChannelStatisticsTest { _connector.getStatistics().start(); } - + @After public void tini() throws Exception { diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java index 8fedb8932ba..484fc92a824 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java @@ -41,53 +41,55 @@ import org.eclipse.jetty.websocket.driver.WebSocketEventDriver; public class WebSocketClientFactory extends AggregateLifeCycle { private static final Logger LOG = Log.getLogger(WebSocketClientFactory.class); - /** - * Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler. - */ - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Queue connections = new ConcurrentLinkedQueue<>(); private final ByteBufferPool bufferPool = new StandardByteBufferPool(); private final Executor executor; + private final ScheduledExecutorService scheduler; private final WebSocketClientSelectorManager selector; private final EventMethodsCache methodsCache; private final WebSocketPolicy policy; public WebSocketClientFactory() { - this(new QueuedThreadPool(),null); + this(new QueuedThreadPool()); } public WebSocketClientFactory(Executor threadPool) { - this(threadPool,null); - } - - public WebSocketClientFactory(Executor executor, SslContextFactory sslContextFactory) - { - if (executor == null) - { - throw new IllegalArgumentException("Executor is required"); - } - this.executor = executor; - addBean(executor); - - if (sslContextFactory != null) - { - addBean(sslContextFactory); - } - - this.policy = WebSocketPolicy.newClientPolicy(); - - selector = new WebSocketClientSelectorManager(bufferPool,executor,policy); - selector.setSslContextFactory(sslContextFactory); - addBean(selector); - - this.methodsCache = new EventMethodsCache(); + this(threadPool, Executors.newSingleThreadScheduledExecutor()); } public WebSocketClientFactory(SslContextFactory sslContextFactory) { - this(null,sslContextFactory); + this(new QueuedThreadPool(), Executors.newSingleThreadScheduledExecutor(), sslContextFactory); + } + + public WebSocketClientFactory(Executor threadPool, ScheduledExecutorService scheduler) + { + this(threadPool, scheduler, null); + } + + public WebSocketClientFactory(Executor executor, ScheduledExecutorService scheduler, SslContextFactory sslContextFactory) + { + if (executor == null) + throw new IllegalArgumentException("Executor is required"); + this.executor = executor; + addBean(executor); + + if (scheduler == null) + throw new IllegalArgumentException("Scheduler is required"); + this.scheduler = scheduler; + + if (sslContextFactory != null) + addBean(sslContextFactory); + + this.policy = WebSocketPolicy.newClientPolicy(); + + selector = new WebSocketClientSelectorManager(bufferPool, executor, scheduler, policy); + selector.setSslContextFactory(sslContextFactory); + addBean(selector); + + this.methodsCache = new EventMethodsCache(); } private void closeConnections() @@ -150,6 +152,6 @@ public class WebSocketClientFactory extends AggregateLifeCycle public WebSocketEventDriver newWebSocketDriver(Object websocketPojo) { - return new WebSocketEventDriver(websocketPojo,methodsCache,policy,getBufferPool()); + return new WebSocketEventDriver(websocketPojo, methodsCache, policy, getBufferPool()); } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java index 64124c85d20..fedfba91c74 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java @@ -39,16 +39,18 @@ import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection; public class WebSocketClientSelectorManager extends SelectorManager { - private SslContextFactory sslContextFactory; private final Executor executor; + private final ScheduledExecutorService scheduler; private final WebSocketPolicy policy; private final ByteBufferPool bufferPool; + private SslContextFactory sslContextFactory; - public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, WebSocketPolicy policy) + public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy) { super(); this.bufferPool = bufferPool; this.executor = executor; + this.scheduler = scheduler; this.policy = policy; } @@ -135,7 +137,7 @@ public class WebSocketClientSelectorManager extends SelectorManager @Override protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { - return new SelectChannelEndPoint(channel,selectSet, selectionKey, policy.getIdleTimeout()); + return new SelectChannelEndPoint(channel,selectSet, selectionKey, scheduler, policy.getIdleTimeout()); } public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel) diff --git a/tests/test-integration/src/test/resources/RFC2616Base.xml b/tests/test-integration/src/test/resources/RFC2616Base.xml index 27b6a4bbe9b..b9da785c11d 100644 --- a/tests/test-integration/src/test/resources/RFC2616Base.xml +++ b/tests/test-integration/src/test/resources/RFC2616Base.xml @@ -57,30 +57,30 @@ - /tests - - - VirtualHost - - - /virtualhost - - virtual + /tests + + + VirtualHost + + + /virtualhost + + virtual /tests /default - - default + + default /echo - - echo + + echo
    @@ -88,20 +88,20 @@
    - - - - - /webapp-contexts/RFC2616 - 0 - - - /testable-jetty-server-config.properties - - - - - + + + + + /webapp-contexts/RFC2616 + 0 + + + /testable-jetty-server-config.properties + + + + + @@ -122,9 +122,9 @@ /webapps - false - true - false + false + true + false /webdefault.xml org.eclipse.jetty.server.webapp.ContainerIncludeJarPattern