Jetty9 - Improved idle timeout handling.

Idle timeouts are not enforced anymore by polling the AsyncEndPoints from the SelectorManager,
but instead the AsyncEndPoints now take a SchedulerExecutorService as parameter and perform
their own enforcing of the idle timeout.

Also removed a few Timer usages (replaced by SchedulerExecutorService) and fixed XML files
referencing old APIs.
This commit is contained in:
Simone Bordet 2012-07-24 10:38:28 +02:00
parent 88f4987721
commit 822abe514e
30 changed files with 812 additions and 883 deletions

View File

@ -25,7 +25,7 @@
<Item>org.eclipse.jetty.servlet.DefaultServlet</Item>
</Array>
<Call name="addLifeCycle">
<Call name="addBean">
<Arg>
<New id="DeploymentManager" class="org.eclipse.jetty.deploy.DeploymentManager">
<Set name="contexts">

View File

@ -3,7 +3,7 @@
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="addLifeCycle">
<Call name="addBean">
<Arg>
<New id="DeploymentManager" class="org.eclipse.jetty.deploy.DeploymentManager">
<Set name="contexts">

View File

@ -7,7 +7,7 @@ public abstract class AbstractEndPoint implements EndPoint
private final long _created=System.currentTimeMillis();
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private volatile long _maxIdleTime;
private volatile long _idleTimeout;
private volatile long _idleTimestamp=System.currentTimeMillis();
@ -27,42 +27,37 @@ public abstract class AbstractEndPoint implements EndPoint
@Override
public long getIdleTimeout()
{
return _maxIdleTime;
return _idleTimeout;
}
@Override
public void setIdleTimeout(long timeMs)
public void setIdleTimeout(long idleTimeout)
{
_maxIdleTime=timeMs;
_idleTimeout = idleTimeout;
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getLocalAddress()
{
return _local;
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getRemoteAddress()
{
return _remote;
}
/* ------------------------------------------------------------ */
public long getIdleTimestamp()
{
return _idleTimestamp;
}
/* ------------------------------------------------------------ */
protected void notIdle()
{
_idleTimestamp=System.currentTimeMillis();
}
/* ------------------------------------------------------------ */
@Override
public String toString()
{
@ -74,5 +69,4 @@ public abstract class AbstractEndPoint implements EndPoint
isOpen(),
isOutputShutdown());
}
}

View File

@ -3,35 +3,20 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint
public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEndPoint, Runnable
{
private static final int TICK=Integer.getInteger("org.eclipse.jetty.io.AsyncByteArrayEndPoint.TICK",100);
public static final Logger LOG=Log.getLogger(AsyncByteArrayEndPoint.class);
private final ScheduledExecutorService _timer;
private AsyncConnection _connection;
private final Runnable _checkTimeout=new Runnable()
{
@Override
public void run()
{
if (isOpen())
{
checkTimeout(System.currentTimeMillis());
if (isOpen())
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
}
}
};
private static final Logger LOG = Log.getLogger(AsyncByteArrayEndPoint.class);
private final ReadInterest _readInterest = new ReadInterest()
{
@ -43,7 +28,6 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
return _in == null || BufferUtil.hasContent(_in);
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
@ -52,26 +36,73 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
// Don't need to do anything here as takeOutput does the signalling.
}
};
private final AtomicReference<Future<?>> _timeout = new AtomicReference<>();
private final ScheduledExecutorService _scheduler;
private volatile AsyncConnection _connection;
public AsyncByteArrayEndPoint(ScheduledExecutorService timer)
public AsyncByteArrayEndPoint(ScheduledExecutorService scheduler, long idleTimeout)
{
super();
_timer=timer;
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
_scheduler = scheduler;
setIdleTimeout(idleTimeout);
}
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, byte[] input, int outputSize)
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, byte[] input, int outputSize)
{
super(input, outputSize);
_timer=timer;
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
_scheduler = timer;
setIdleTimeout(idleTimeout);
}
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, String input, int outputSize)
public AsyncByteArrayEndPoint(ScheduledExecutorService timer, long idleTimeout, String input, int outputSize)
{
super(input, outputSize);
_timer=timer;
_timer.schedule(_checkTimeout,TICK,TimeUnit.MILLISECONDS);
_scheduler = timer;
setIdleTimeout(idleTimeout);
}
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
scheduleIdleTimeout(idleTimeout);
}
private void scheduleIdleTimeout(long delay)
{
Future<?> newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(this, delay, TimeUnit.MILLISECONDS) : null;
Future<?> oldTimeout = _timeout.getAndSet(newTimeout);
if (oldTimeout != null)
oldTimeout.cancel(false);
}
@Override
public void run()
{
if (isOpen())
{
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
if (idleLeft < 0)
{
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout);
}
}
@Override
@ -129,35 +160,6 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
_connection = connection;
}
@Override
public void checkTimeout(long now)
{
synchronized (this)
{
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
if (idleTimestamp != 0 && idleTimeout > 0)
{
long idleForMs = now - idleTimestamp;
if (idleForMs > idleTimeout)
{
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("idle "+idleForMs+"ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
}
}
@Override
public void onOpen()
{

View File

@ -21,30 +21,30 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExecutorCallback;
import org.eclipse.jetty.util.FutureCallback;
/* ------------------------------------------------------------ */
/**Asynchronous End Point
* <p>
* This extension of EndPoint provides asynchronous scheduling methods.
* The design of these has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because: they have
* some inefficiencies.
* <p>
* This class will frequently be used in conjunction with some of the utility
/**
* <p>{@link AsyncEndPoint} add asynchronous scheduling methods to {@link EndPoint}.</p>
* <p>The design of these has been influenced by NIO.2 Futures and Completion
* handlers, but does not use those actual interfaces because they have
* some inefficiencies.</p>
* <p>This class will frequently be used in conjunction with some of the utility
* implementations of {@link Callback}, such as {@link FutureCallback} and
* {@link ExecutorCallback}. Examples are:
* {@link ExecutorCallback}. Examples are:</p>
*
* <h3>Blocking Read</h3>
* A FutureCallback can be used to block until an endpoint is ready to be filled
* <p>A FutureCallback can be used to block until an endpoint is ready to be filled
* from:
* <blockquote><pre>
* FutureCallback<String> future = new FutureCallback<>();
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.fillInterested("ContextObj",future);
* ...
* String context = future.get(); // This blocks
* int filled=endpoint.fill(mybuffer);</pre></blockquote>
* int filled=endpoint.fill(mybuffer);
* </pre></blockquote></p>
*
* <h3>Dispatched Read</h3>
* By using a different callback, the read can be done asynchronously in its own dispatched thread:
* <p>By using a different callback, the read can be done asynchronously in its own dispatched thread:
* <blockquote><pre>
* endpoint.fillInterested("ContextObj",new ExecutorCallback<String>(executor)
* endpoint.fillInterested("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* {
* public void onCompleted(String context)
* {
@ -52,25 +52,25 @@ import org.eclipse.jetty.util.FutureCallback;
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* });</pre></blockquote>
* The executor callback can also be customized to not dispatch in some circumstances when
* it knows it can use the callback thread and does not need to dispatch.
* });
* </pre></blockquote></p>
* <p>The executor callback can also be customized to not dispatch in some circumstances when
* it knows it can use the callback thread and does not need to dispatch.</p>
*
* <h3>Blocking Write</h3>
* The write contract is that the callback complete is not called until all data has been
* <p>The write contract is that the callback complete is not called until all data has been
* written or there is a failure. For blocking this looks like:
*
* <blockquote><pre>
* FutureCallback<String> future = new FutureCallback<>();
* FutureCallback&lt;String&gt; future = new FutureCallback&lt;&gt;();
* endpoint.write("ContextObj",future,headerBuffer,contentBuffer);
* String context = future.get(); // This blocks
* </pre></blockquote>
* </pre></blockquote></p>
*
* <h3>Dispatched Write</h3>
* Note also that multiple buffers may be passed in write so that gather writes
* <p>Note also that multiple buffers may be passed in write so that gather writes
* can be done:
* <blockquote><pre>
* endpoint.write("ContextObj",new ExecutorCallback<String>(executor)
* endpoint.write("ContextObj",new ExecutorCallback&lt;String&gt;(executor)
* {
* public void onCompleted(String context)
* {
@ -78,45 +78,52 @@ import org.eclipse.jetty.util.FutureCallback;
* ...
* }
* public void onFailed(String context,Throwable cause) {...}
* },headerBuffer,contentBuffer);</pre></blockquote>
*
* },headerBuffer,contentBuffer);
* </pre></blockquote></p>
*/
public interface AsyncEndPoint extends EndPoint
{
/** Asynchronous a fillable notification.
* <p>
* This method schedules a callback operations when a call to {@link #fill(ByteBuffer)} will return data or EOF.
* @param context Context to return via the callback
* @param callback The callback to call when an error occurs or we are readable.
/**
* <p>Requests callback methods to be invoked when a call to {@link #fill(ByteBuffer)} would return data or EOF.</p>
*
* @param context the context to return via the callback
* @param callback the callback to call when an error occurs or we are readable.
* @throws ReadPendingException if another read operation is concurrent.
*/
<C> void fillInterested(C context, Callback<C> callback) throws ReadPendingException;
/** Asynchronous write operation.
* <p>
* This method performs {@link #flush(ByteBuffer...)} operation(s) and do a callback when all the data
* has been flushed or an error occurs.
* @param context Context to return via the callback
* @param callback The callback to call when an error occurs or we are readable.
* @param buffers One or more {@link ByteBuffer}s that will be flushed.
/**
* <p>Writes the given buffers via {@link #flush(ByteBuffer...)} and invokes callback methods when either
* all the data has been flushed or an error occurs.</p>
*
* @param context the context to return via the callback
* @param callback the callback to call when an error occurs or the write completed.
* @param buffers one or more {@link ByteBuffer}s that will be flushed.
* @throws WritePendingException if another write operation is concurrent.
*/
<C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws WritePendingException;
/**
* @return Timestamp in ms since epoch of when the last data was
* filled or flushed from this endpoint.
* @return the {@link AsyncConnection} associated with this {@link AsyncEndPoint}
* @see #setAsyncConnection(AsyncConnection)
*/
long getIdleTimestamp();
AsyncConnection getAsyncConnection();
/**
* @param connection the {@link AsyncConnection} associated with this {@link AsyncEndPoint}
* @see #getAsyncConnection()
*/
void setAsyncConnection(AsyncConnection connection);
/**
* <p>Callback method invoked when this {@link AsyncEndPoint} is opened.</p>
* @see #onClose()
*/
void onOpen();
/**
* <p>Callback method invoked when this {@link AsyncEndPoint} is close.</p>
* @see #onOpen()
*/
void onClose();
void checkTimeout(long now);
}

View File

@ -119,10 +119,10 @@ public interface EndPoint
long getIdleTimeout();
/* ------------------------------------------------------------ */
/** Set the max idle time.
* @param timeMs the max idle time in MS. Timeout <= 0 implies an infinite timeout
/** Set the idle timeout.
* @param idleTimeout the idle timeout in MS. Timeout <= 0 implies an infinite timeout
*/
void setIdleTimeout(long timeMs);
void setIdleTimeout(long idleTimeout);

View File

@ -18,6 +18,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -28,9 +29,9 @@ public class NetworkTrafficSelectChannelEndPoint extends SelectChannelEndPoint
private final List<NetworkTrafficListener> listeners;
public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key, long idleTimeout, List<NetworkTrafficListener> listeners) throws IOException
public NetworkTrafficSelectChannelEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout, List<NetworkTrafficListener> listeners) throws IOException
{
super(channel, selectSet, key, idleTimeout);
super(channel, selectSet, key, scheduler, idleTimeout);
this.listeners = listeners;
}

View File

@ -17,8 +17,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.SelectorManager.ManagedSelector;
import org.eclipse.jetty.util.Callback;
@ -32,21 +36,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
private final SelectorManager.ManagedSelector _selector;
private final SelectionKey _key;
/**
* The desired value for {@link SelectionKey#interestOps()}
*/
private volatile int _interestOps;
private final AtomicReference<Future<?>> _timeout = new AtomicReference<>();
private final Runnable _idleTask = new Runnable()
{
@Override
public void run()
{
checkIdleTimeout();
}
};
/**
* true if {@link ManagedSelector#destroyEndPoint(AsyncEndPoint)} has not been called
*/
private final AtomicBoolean _open = new AtomicBoolean();
private volatile AsyncConnection _connection;
private final ReadInterest _readInterest = new ReadInterest()
{
@Override
@ -56,7 +58,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
return false;
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
@ -65,15 +66,39 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
updateKey(SelectionKey.OP_WRITE, true);
}
};
private final SelectorManager.ManagedSelector _selector;
private final SelectionKey _key;
private final ScheduledExecutorService _scheduler;
private volatile AsyncConnection _connection;
/**
* The desired value for {@link SelectionKey#interestOps()}
*/
private volatile int _interestOps;
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, long idleTimeout) throws IOException
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, ScheduledExecutorService scheduler, long idleTimeout) throws IOException
{
super(channel);
_selector = selector;
_key = key;
_scheduler = scheduler;
setIdleTimeout(idleTimeout);
}
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
scheduleIdleTimeout(idleTimeout);
}
private void scheduleIdleTimeout(long delay)
{
Future<?> newTimeout = isOpen() && delay > 0 ? _scheduler.schedule(_idleTask, delay, TimeUnit.MILLISECONDS) : null;
Future<?> oldTimeout = _timeout.getAndSet(newTimeout);
if (oldTimeout != null)
oldTimeout.cancel(false);
}
@Override
public <C> void fillInterested(C context, Callback<C> callback) throws IllegalStateException
{
@ -111,32 +136,32 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
_writeFlusher.completeWrite();
}
@Override
public void checkTimeout(long now)
private void checkIdleTimeout()
{
synchronized (this)
{
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
if (isOpen())
{
long idleTimestamp = getIdleTimestamp();
long idleTimeout = getIdleTimeout();
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
long idleForMs = now - idleTimestamp;
if (idleForMs > idleTimeout)
if (idleLeft < 0)
{
if (isOutputShutdown())
close();
notIdle();
TimeoutException timeout = new TimeoutException("idle " + idleForMs + "ms");
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
}
}
}
scheduleIdleTimeout(idleLeft > 0 ? idleLeft : idleTimeout);
}
}

View File

@ -25,17 +25,15 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
@ -54,7 +52,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private final ManagedSelector[] _selectors;
private volatile long _selectorIndex;
private volatile long _idleCheckPeriod;
protected SelectorManager()
{
@ -64,23 +61,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
protected SelectorManager(@Name(value="selectors") int selectors)
{
_selectors = new ManagedSelector[selectors];
setIdleCheckPeriod(1000);
}
/**
* @return the period, in milliseconds, a background thread checks for idle expiration
*/
public long getIdleCheckPeriod()
{
return _idleCheckPeriod;
}
/**
* @param idleCheckPeriod the period, in milliseconds, a background thread checks for idle expiration
*/
public void setIdleCheckPeriod(long idleCheckPeriod)
{
_idleCheckPeriod = idleCheckPeriod;
}
/**
@ -145,7 +125,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
_selectors[i] = selectSet;
selectSet.start();
execute(selectSet);
execute(new Expirer());
}
}
@ -251,37 +230,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
AggregateLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
}
/**
* <p>The task that performs a periodic idle check of the connections managed by the selectors.</p>
* @see #getIdleCheckPeriod()
*/
private class Expirer implements Runnable
{
@Override
public void run()
{
while (isRunning())
{
for (ManagedSelector selector : _selectors)
if (selector != null)
selector.timeoutCheck();
sleep(getIdleCheckPeriod());
}
}
private void sleep(long delay)
{
try
{
Thread.sleep(delay);
}
catch (InterruptedException x)
{
LOG.ignore(x);
}
}
}
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
* <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
@ -291,7 +239,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{
private final Queue<Runnable> _changes = new ConcurrentLinkedQueue<>();
private final Set<AsyncEndPoint> _endPoints = Collections.newSetFromMap(new ConcurrentHashMap<AsyncEndPoint, Boolean>());
private final int _id;
private Selector _selector;
private Thread _thread;
@ -517,7 +464,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
private AsyncEndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
{
AsyncEndPoint endPoint = newEndPoint(channel, this, selectionKey);
_endPoints.add(endPoint);
endPointOpened(endPoint);
AsyncConnection asyncConnection = newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setAsyncConnection(asyncConnection);
@ -529,7 +475,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
public void destroyEndPoint(AsyncEndPoint endPoint)
{
LOG.debug("Destroyed {}", endPoint);
_endPoints.remove(endPoint);
endPoint.getAsyncConnection().onClose();
endPointClosed(endPoint);
}
@ -597,15 +542,6 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
}
private void timeoutCheck()
{
// We cannot use the _selector.keys() because the returned Set is not thread
// safe so it may be modified by the selector thread while we iterate here.
long now = System.currentTimeMillis();
for (AsyncEndPoint endPoint : _endPoints)
endPoint.checkTimeout(now);
}
private class DumpKeys implements Runnable
{
private final CountDownLatch latch = new CountDownLatch(1);

View File

@ -179,11 +179,6 @@ public class SslConnection extends AbstractAsyncConnection
{
}
@Override
public void checkTimeout(long now)
{
}
private final Callback<Void> _writeCallback = new Callback<Void>()
{

View File

@ -1,16 +1,9 @@
package org.eclipse.jetty.io;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BufferUtil;
@ -20,26 +13,33 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AsyncByteArrayEndPointTest
{
ScheduledExecutorService _timer;
private ScheduledExecutorService _scheduler;
@Before
public void before()
{
_timer = new ScheduledThreadPoolExecutor(1);
_scheduler = Executors.newSingleThreadScheduledExecutor();
}
@After
public void after()
{
_timer.shutdownNow();
_scheduler.shutdownNow();
}
@Test
public void testReadable() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000);
endp.setInput("test input");
ByteBuffer buffer = BufferUtil.allocate(1024);
@ -92,13 +92,12 @@ public class AsyncByteArrayEndPointTest
{
assertThat(e.toString(), containsString("Closed"));
}
}
@Test
public void testWrite() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,(byte[])null,15);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 5000, (byte[])null, 15);
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(10));
@ -126,8 +125,7 @@ public class AsyncByteArrayEndPointTest
@Test
public void testIdle() throws Exception
{
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer);
endp.setIdleTimeout(500);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_scheduler, 500);
endp.setInput("test");
endp.setGrowOutput(false);
endp.setOutput(BufferUtil.allocate(5));
@ -190,8 +188,5 @@ public class AsyncByteArrayEndPointTest
// idle close
Thread.sleep(1000);
assertFalse(endp.isOpen());
}
}

View File

@ -1,3 +1,19 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.io;
import java.io.BufferedInputStream;
@ -13,6 +29,8 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
@ -35,6 +53,7 @@ public class SelectChannelEndPointTest
protected volatile AsyncEndPoint _lastEndp;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor();
protected SelectorManager _manager = new SelectorManager()
{
@Override
@ -52,7 +71,7 @@ public class SelectChannelEndPointTest
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, 60000);
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, _scheduler, 60000);
_lastEndp = endp;
return endp;
}
@ -293,11 +312,8 @@ public class SelectChannelEndPointTest
// Read close
assertEquals(-1, client.getInputStream().read());
}
@Test
public void testBlockRead() throws Exception
{
@ -451,7 +467,6 @@ public class SelectChannelEndPointTest
assertFalse(_lastEndp.isOpen());
}
@Test
public void testStress() throws Exception
{
@ -556,7 +571,6 @@ public class SelectChannelEndPointTest
assertEquals(0, latch.getCount());
}
@Test
public void testWriteBlock() throws Exception
{
@ -603,7 +617,5 @@ public class SelectChannelEndPointTest
assert (i++ < 10);
Thread.sleep(10);
}
}
}

View File

@ -10,6 +10,8 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocket;
@ -36,6 +38,7 @@ public class SslConnectionTest
protected volatile AsyncEndPoint _lastEndp;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected ScheduledExecutorService _scheduler = Executors.newSingleThreadScheduledExecutor();
protected SelectorManager _manager = new SelectorManager()
{
@Override
@ -61,7 +64,7 @@ public class SslConnectionTest
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, 60000);
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel,selectSet, selectionKey, _scheduler, 60000);
_lastEndp=endp;
return endp;
}

View File

@ -194,7 +194,7 @@
<!-- in the $JETTY_HOME/contexts directory -->
<!-- -->
<!-- =========================================================== -->
<Call name="addLifeCycle">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.server.deployer.ContextDeployer">
<Set name="contexts"><Ref id="Contexts"/></Set>
@ -217,7 +217,7 @@
<!-- Normally only one type of deployer need be used. -->
<!-- -->
<!-- =========================================================== -->
<Call name="addLifeCycle">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.server.deployer.WebAppDeployer">
<Set name="contexts"><Ref id="Contexts"/></Set>

View File

@ -36,73 +36,51 @@ import org.eclipse.jetty.util.log.Logger;
* <li>Base acceptor thread</li>
* <li>Optional reverse proxy headers checking</li>
* </ul>
*
*
*/
public abstract class AbstractConnector extends AggregateLifeCycle implements Connector, Dumpable
{
static final Logger LOG = Log.getLogger(AbstractConnector.class);
private final Thread[] _acceptors;
private String _name;
private Server _server;
private Executor _executor;
private String _host;
private int _port = 0;
private int _acceptQueueSize = 0;
private int _acceptorPriorityOffset = 0;
private boolean _reuseAddress = true;
private ByteBufferPool _byteBufferPool=new StandardByteBufferPool(); // TODO should this be server wide? or a thread local one?
protected final Logger LOG = Log.getLogger(getClass());
private final Statistics _stats = new ConnectionStatistics();
private final Thread[] _acceptors;
private volatile String _name;
private volatile Server _server;
private volatile Executor _executor;
private volatile int _acceptQueueSize = 128;
private volatile boolean _reuseAddress = true;
private volatile ByteBufferPool _byteBufferPool;
private volatile long _idleTimeout = 200000;
private volatile int _soLingerTime = -1;
protected long _idleTimeout = 200000;
protected int _soLingerTime = -1;
/* ------------------------------------------------------------ */
/**
*/
public AbstractConnector()
{
this(Math.max(1, (Runtime.getRuntime().availableProcessors()) / 4));
}
/* ------------------------------------------------------------ */
/**
*/
public AbstractConnector(@Name(value="acceptors") int acceptors)
public AbstractConnector(@Name("acceptors") int acceptors)
{
if (acceptors > 2 * Runtime.getRuntime().availableProcessors())
LOG.warn("Acceptors should be <= 2*availableProcessors: " + this);
_acceptors = new Thread[acceptors];
}
/* ------------------------------------------------------------ */
@Override
public Statistics getStatistics()
{
return _stats;
}
/* ------------------------------------------------------------ */
/*
*/
@Override
public Server getServer()
{
return _server;
}
/* ------------------------------------------------------------ */
public void setServer(Server server)
{
_server = server;
}
/* ------------------------------------------------------------ */
public Executor findExecutor()
{
if (_executor == null && getServer() != null)
@ -110,14 +88,12 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _executor;
}
/* ------------------------------------------------------------ */
@Override
public Executor getExecutor()
{
return _executor;
}
/* ------------------------------------------------------------ */
public void setExecutor(Executor executor)
{
removeBean(_executor);
@ -125,14 +101,12 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
addBean(_executor);
}
/* ------------------------------------------------------------ */
@Override
public ByteBufferPool getByteBufferPool()
{
return _byteBufferPool;
}
/* ------------------------------------------------------------ */
public void setByteBufferPool(ByteBufferPool byteBufferPool)
{
removeBean(byteBufferPool);
@ -140,53 +114,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
addBean(_byteBufferPool);
}
/* ------------------------------------------------------------ */
public void setHost(String host)
{
if (this instanceof NetConnector)
_host = host;
else
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public String getHost()
{
return _host;
}
/* ------------------------------------------------------------ */
public void setPort(int port)
{
if (this instanceof NetConnector)
_port = port;
else
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public int getPort()
{
return _port;
}
/* ------------------------------------------------------------ */
public void open() throws IOException
{
}
/* ------------------------------------------------------------ */
public void close() throws IOException
{
}
/* ------------------------------------------------------------ */
public int getLocalPort()
{
return -1;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the maxIdleTime.
*/
@ -196,7 +123,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _idleTimeout;
}
/* ------------------------------------------------------------ */
/**
* Set the maximum Idle time for a connection, which roughly translates to the {@link Socket#setSoTimeout(int)} call, although with NIO implementations
* other mechanisms may be used to implement the timeout. The max idle time is applied:
@ -209,19 +135,17 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
* timeout (if implemented by jetty) is reset. However, in many instances, the reading/writing is delegated to the JVM, and the semantic is more strictly
* enforced as the maximum time a single read/write operation can take. Note, that as Jetty supports writes of memory mapped file buffers, then a write may
* take many 10s of seconds for large content written to a slow device.
* <p>
* <p/>
* Previously, Jetty supported separate idle timeouts and IO operation timeouts, however the expense of changing the value of soTimeout was significant, so
* these timeouts were merged. With the advent of NIO, it may be possible to again differentiate these values (if there is demand).
*
* @param idleTimeout
* The idleTimeout to set.
* @param idleTimeout The idleTimeout to set.
*/
public void setIdleTimeout(long idleTimeout)
{
_idleTimeout = idleTimeout;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the soLingerTime.
*/
@ -230,7 +154,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _soLingerTime;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the acceptQueueSize.
*/
@ -239,17 +162,14 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _acceptQueueSize;
}
/* ------------------------------------------------------------ */
/**
* @param acceptQueueSize
* The acceptQueueSize to set.
* @param acceptQueueSize The acceptQueueSize to set.
*/
public void setAcceptQueueSize(int acceptQueueSize)
{
_acceptQueueSize = acceptQueueSize;
}
/* ------------------------------------------------------------ */
/**
* @return Returns the number of acceptor threads.
*/
@ -259,30 +179,21 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
}
/* ------------------------------------------------------------ */
/**
* @param soLingerTime
* The soLingerTime to set or -1 to disable.
* @param soLingerTime The soLingerTime to set or -1 to disable.
*/
public void setSoLingerTime(int soLingerTime)
{
_soLingerTime = soLingerTime;
}
/* ------------------------------------------------------------ */
@Override
protected void doStart() throws Exception
{
if (_server == null)
throw new IllegalStateException("No server");
if (_name==null)
_name = (getHost() == null?"0.0.0.0":getHost()) + ":" + getPort();
// open listener port
open();
_name=_name+"/"+getLocalPort();
_byteBufferPool = new StandardByteBufferPool();
super.doStart();
@ -296,19 +207,9 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
LOG.info("Started {}", this);
}
/* ------------------------------------------------------------ */
@Override
protected void doStop() throws Exception
{
try
{
close();
}
catch (IOException e)
{
LOG.warn(e);
}
super.doStop();
for (Thread thread : _acceptors)
@ -322,7 +223,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
_name = _name.substring(0, i);
}
/* ------------------------------------------------------------ */
public void join() throws InterruptedException
{
for (Thread thread : _acceptors)
@ -330,7 +230,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
thread.join();
}
/* ------------------------------------------------------------ */
protected void configure(Socket socket)
{
try
@ -347,21 +246,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
}
}
/* ------------------------------------------------------------ */
protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
/* ------------------------------------------------------------ */
@Override
public String toString()
{
return String.format("%s@%s:%d",
getClass().getSimpleName(),
getHost()==null?"0.0.0.0":getHost(),
getLocalPort()<=0?getPort():getLocalPort());
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class Acceptor implements Runnable
{
@ -372,7 +258,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
_acceptor = id;
}
/* ------------------------------------------------------------ */
@Override
public void run()
{
@ -391,7 +276,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
try
{
current.setPriority(old_priority - _acceptorPriorityOffset);
current.setPriority(old_priority);
while (isRunning() && getTransport() != null)
{
try
@ -421,26 +306,22 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
}
}
/* ------------------------------------------------------------ */
@Override
public String getName()
{
return _name;
}
/* ------------------------------------------------------------ */
public void setName(String name)
{
_name = name;
}
/* ------------------------------------------------------------ */
protected void connectionOpened(AsyncConnection connection)
{
_stats.connectionOpened();
}
/* ------------------------------------------------------------ */
protected void connectionUpgraded(AsyncConnection oldConnection, AsyncConnection newConnection)
{
long duration = System.currentTimeMillis() - oldConnection.getEndPoint().getCreatedTimeStamp();
@ -448,7 +329,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
_stats.connectionUpgraded(duration, requests, requests);
}
/* ------------------------------------------------------------ */
protected void connectionClosed(AsyncConnection connection)
{
long duration = System.currentTimeMillis() - connection.getEndPoint().getCreatedTimeStamp();
@ -457,7 +337,6 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
_stats.connectionClosed(duration, requests, requests);
}
/* ------------------------------------------------------------ */
/**
* @return True if the the server socket will be opened in SO_REUSEADDR mode.
*/
@ -466,10 +345,8 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
return _reuseAddress;
}
/* ------------------------------------------------------------ */
/**
* @param reuseAddress
* True if the the server socket will be opened in SO_REUSEADDR mode.
* @param reuseAddress True if the the server socket will be opened in SO_REUSEADDR mode.
*/
public void setReuseAddress(boolean reuseAddress)
{

View File

@ -0,0 +1,100 @@
// ========================================================================
// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.server;
import java.io.IOException;
public abstract class AbstractNetConnector extends AbstractConnector implements Connector.NetConnector
{
private volatile String _host;
private volatile int _port = 0;
protected AbstractNetConnector()
{
}
protected AbstractNetConnector(int acceptors)
{
super(acceptors);
}
public void setHost(String host)
{
_host = host;
}
public String getHost()
{
return _host;
}
public void setPort(int port)
{
_port = port;
}
public int getPort()
{
return _port;
}
public int getLocalPort()
{
return -1;
}
@Override
protected void doStart() throws Exception
{
if (getName() == null)
setName(getHost() == null ? "0.0.0.0" : getHost() + ":" + getPort());
open();
setName(getName() + "/" + getLocalPort());
super.doStart();
}
@Override
protected void doStop() throws Exception
{
try
{
close();
}
catch (IOException e)
{
LOG.warn(e);
}
super.doStop();
}
public void open() throws IOException
{
}
public void close() throws IOException
{
}
@Override
public String toString()
{
return String.format("%s@%s:%d",
getClass().getSimpleName(),
getHost() == null ? "0.0.0.0" : getHost(),
getLocalPort() <= 0 ? getPort() : getLocalPort());
}
}

View File

@ -65,7 +65,7 @@ public interface Connector extends LifeCycle
/* ------------------------------------------------------------ */
Statistics getStatistics();
interface NetConnector extends Connector
interface NetConnector extends Connector, AutoCloseable
{
/* ------------------------------------------------------------ */
/**
@ -75,7 +75,7 @@ public interface Connector extends LifeCycle
void open() throws IOException;
/* ------------------------------------------------------------ */
void close();
void close() throws IOException;
/* ------------------------------------------------------------ */
/**

View File

@ -18,8 +18,7 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.concurrent.ScheduledExecutorService;
import javax.servlet.DispatcherType;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
@ -55,7 +54,7 @@ public abstract class HttpChannel
{
static final Logger LOG = Log.getLogger(HttpChannel.class);
private static final ThreadLocal<HttpChannel> __currentChannel = new ThreadLocal<HttpChannel>();
private static final ThreadLocal<HttpChannel> __currentChannel = new ThreadLocal<>();
/* ------------------------------------------------------------ */
public static HttpChannel getCurrentHttpChannel()
@ -100,9 +99,6 @@ public abstract class HttpChannel
/* ------------------------------------------------------------ */
/** Constructor
*
*/
public HttpChannel(Server server,AsyncConnection connection,HttpInput input)
{
_server = server;
@ -220,6 +216,7 @@ public abstract class HttpChannel
*
* @return The input stream for this connection.
* The stream will be created if it does not already exist.
* @throws IOException if the InputStream cannot be created
*/
public ServletInputStream getInputStream() throws IOException
{
@ -251,6 +248,7 @@ public abstract class HttpChannel
/* ------------------------------------------------------------ */
/**
* @param charset the character set for the PrintWriter
* @return A {@link PrintWriter} wrapping the {@link #getOutputStream output stream}. The writer is created if it
* does not already exist.
*/
@ -483,9 +481,6 @@ public abstract class HttpChannel
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.AsyncConnection#isSuspended()
*/
public boolean isSuspended()
{
return _request.getAsyncContinuation().isSuspended();
@ -576,7 +571,7 @@ public abstract class HttpChannel
break;
default:
String[] values = value.toString().split(",");
String[] values = value.split(",");
for (int i=0;values!=null && i<values.length;i++)
{
expect=HttpHeaderValue.CACHE.get(values[i].trim());
@ -816,18 +811,12 @@ public abstract class HttpChannel
protected abstract void execute(Runnable task);
// TODO replace with ScheduledExecutorService?
// TODO constructor inject
public abstract Timer getTimer();
// TODO use constructor injection ?
public abstract ScheduledExecutorService getScheduler();
/* ------------------------------------------------------------ */
public interface EventHandler extends HttpParser.RequestHandler
{
ResponseInfo commit();
}
}

View File

@ -15,9 +15,9 @@ package org.eclipse.jetty.server;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
@ -75,7 +75,7 @@ public class HttpChannelState implements AsyncContext, Continuation
COMPLETECALLED,// complete called
COMPLETING, // Request is completable
COMPLETED // Request is complete
};
}
/* ------------------------------------------------------------ */
private final HttpChannel _channel;
@ -117,7 +117,7 @@ public class HttpChannelState implements AsyncContext, Continuation
synchronized(this)
{
if (_asyncListeners==null)
_asyncListeners=new ArrayList<AsyncListener>();
_asyncListeners=new ArrayList<>();
_asyncListeners.add(listener);
}
}
@ -130,7 +130,7 @@ public class HttpChannelState implements AsyncContext, Continuation
{
// TODO handle the request/response ???
if (_asyncListeners==null)
_asyncListeners=new ArrayList<AsyncListener>();
_asyncListeners=new ArrayList<>();
_asyncListeners.add(listener);
}
}
@ -142,7 +142,7 @@ public class HttpChannelState implements AsyncContext, Continuation
synchronized(this)
{
if (_continuationListeners==null)
_continuationListeners=new ArrayList<ContinuationListener>();
_continuationListeners=new ArrayList<>();
_continuationListeners.add(listener);
}
}
@ -717,12 +717,9 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
protected void scheduleTimeout()
{
Timer timer = _channel.getTimer();
if (timer!=null)
{
_event._timeout= new AsyncTimeout();
timer.schedule(_event._timeout,_timeoutMs);
}
ScheduledExecutorService scheduler = _channel.getScheduler();
if (scheduler!=null)
_event._timeout=scheduler.schedule(new AsyncTimeout(),_timeoutMs,TimeUnit.MILLISECONDS);
}
/* ------------------------------------------------------------ */
@ -731,9 +728,9 @@ public class HttpChannelState implements AsyncContext, Continuation
AsyncEventState event=_event;
if (event!=null)
{
TimerTask task=event._timeout;
Future<?> task=event._timeout;
if (task!=null)
task.cancel();
task.cancel(false);
}
}
@ -1022,7 +1019,7 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
public class AsyncTimeout extends TimerTask
public class AsyncTimeout implements Runnable
{
@Override
public void run()
@ -1035,7 +1032,7 @@ public class HttpChannelState implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
public class AsyncEventState extends AsyncEvent
{
private TimerTask _timeout;
private Future<?> _timeout;
private final ServletContext _suspendedContext;
private ServletContext _dispatchContext;
private String _pathInContext;

View File

@ -15,9 +15,9 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpGenerator.Action;
@ -41,7 +41,7 @@ public class HttpConnection extends AbstractAsyncConnection
{
public static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<HttpConnection>();
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
public static final String UPGRADE_CONNECTION_ATTR = "org.eclispe.jetty.server.HttpConnection.UPGRADE";
@ -76,17 +76,12 @@ public class HttpConnection extends AbstractAsyncConnection
}
/* ------------------------------------------------------------ */
/** Constructor
*
*/
public HttpConnection(HttpConnector connector, AsyncEndPoint endpoint, Server server)
{
super(endpoint,connector.findExecutor());
_connector = connector;
_bufferPool=_connector.getByteBufferPool();
if (_bufferPool==null)
new Throwable().printStackTrace();
_server = server;
@ -696,9 +691,9 @@ public class HttpConnection extends AbstractAsyncConnection
}
@Override
public Timer getTimer()
public ScheduledExecutorService getScheduler()
{
return _connector.getTimer();
return _connector.getScheduler();
}
@Override
@ -747,7 +742,7 @@ public class HttpConnection extends AbstractAsyncConnection
return fcb;
}
};
}
private class HttpHttpInput extends HttpInput
{

View File

@ -2,8 +2,9 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import javax.servlet.ServletRequest;
import org.eclipse.jetty.http.HttpFields;
@ -11,7 +12,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme;
public abstract class HttpConnector extends AbstractConnector
public abstract class HttpConnector extends AbstractNetConnector
{
private String _integralScheme = HttpScheme.HTTPS.asString();
private int _integralPort = 0;
@ -19,24 +20,21 @@ public abstract class HttpConnector extends AbstractConnector
private int _confidentialPort = 0;
private boolean _forwarded;
private String _hostHeader;
private Timer _timer = new Timer(true);
private ScheduledExecutorService _scheduler;
private boolean _shutdownScheduler;
private String _forwardedHostHeader = HttpHeader.X_FORWARDED_HOST.toString();
private String _forwardedServerHeader = HttpHeader.X_FORWARDED_SERVER.toString();
private String _forwardedForHeader = HttpHeader.X_FORWARDED_FOR.toString();
private String _forwardedProtoHeader = HttpHeader.X_FORWARDED_PROTO.toString();
private String _forwardedCipherSuiteHeader;
private String _forwardedSslSessionIdHeader;
private int _requestHeaderSize=6*1024;;
private int _requestHeaderSize=6*1024;
private int _requestBufferSize=16*1024;
private int _responseHeaderSize=6*1024;
private int _responseBufferSize=16*1024;
public HttpConnector()
{
super();
}
public HttpConnector(int acceptors)
@ -44,27 +42,43 @@ public abstract class HttpConnector extends AbstractConnector
super(acceptors);
}
public ScheduledExecutorService getScheduler()
{
return _scheduler;
}
public void setScheduler(ScheduledExecutorService scheduler)
{
_scheduler = scheduler;
}
@Override
protected void doStart() throws Exception
{
super.doStart();
_timer=new Timer("Timer-"+getName(),true);
if (_scheduler == null)
{
_scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
{
@Override
public Thread newThread(Runnable r)
{
return new Thread(r, "Timer-" + HttpConnector.this.getName());
}
});
_shutdownScheduler = true;
}
}
@Override
protected void doStop() throws Exception
{
if (_timer!=null)
_timer.cancel();
_timer=null;
if (_shutdownScheduler)
_scheduler.shutdownNow();
_scheduler = null;
super.doStop();
}
public Timer getTimer()
{
return _timer;
}
public int getRequestHeaderSize()
{
return _requestHeaderSize;
@ -307,12 +321,11 @@ public abstract class HttpConnector extends AbstractConnector
* Set reverse proxy handling. If set to true, then the X-Forwarded headers (or the headers set in their place) are looked for to set the request protocol,
* host, server and client ip.
*
* @param check
* true if this connector is checking the x-forwarded-for/host/server headers
* @set {@link #setForwardedForHeader(String)}
* @set {@link #setForwardedHostHeader(String)}
* @set {@link #setForwardedProtoHeader(String)}
* @set {@link #setForwardedServerHeader(String)}
* @param check true if this connector is checking the x-forwarded-for/host/server headers
* @see #setForwardedForHeader(String)
* @see #setForwardedHostHeader(String)
* @see #setForwardedProtoHeader(String)
* @see #setForwardedServerHeader(String)
*/
public void setForwarded(boolean check)
{
@ -384,6 +397,7 @@ public abstract class HttpConnector extends AbstractConnector
/* ------------------------------------------------------------ */
/**
* @return the forwarded for header
* @see #setForwarded(boolean)
*/
public String getForwardedForHeader()
@ -464,6 +478,4 @@ public abstract class HttpConnector extends AbstractConnector
{
_forwardedSslSessionIdHeader = forwardedSslSessionId;
}
}

View File

@ -15,14 +15,11 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.AsyncByteArrayEndPoint;
@ -34,9 +31,9 @@ import org.eclipse.jetty.util.log.Logger;
public class LocalHttpConnector extends HttpConnector
{
private static final Logger LOG = Log.getLogger(LocalHttpConnector.class);
private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<LocalEndPoint>();
private ScheduledExecutorService _timer;
private LocalExecutor _executor;
private final BlockingQueue<LocalEndPoint> _connects = new LinkedBlockingQueue<>();
private volatile LocalExecutor _executor;
public LocalHttpConnector()
{
@ -49,12 +46,12 @@ public class LocalHttpConnector extends HttpConnector
return this;
}
/** Sends requests and get's responses based on thread activity.
/** Sends requests and get responses based on thread activity.
* Returns all the responses received once the thread activity has
* returned to the level it was before the requests.
* @param requests
* @return
* @throws Exception
* @param requests the requests
* @return the responses
* @throws Exception if the requests fail
*/
public String getResponses(String requests) throws Exception
{
@ -65,6 +62,9 @@ public class LocalHttpConnector extends HttpConnector
/** Sends requests and get's responses based on thread activity.
* Returns all the responses received once the thread activity has
* returned to the level it was before the requests.
* @param requestsBuffer the requests
* @return the responses
* @throws Exception if the requests fail
*/
public ByteBuffer getResponses(ByteBuffer requestsBuffer) throws Exception
{
@ -81,8 +81,8 @@ public class LocalHttpConnector extends HttpConnector
/**
* Execute a request and return the EndPoint through which
* responses can be received.
* @param rawRequest
* @return
* @param rawRequest the request
* @return the local endpoint
*/
public LocalEndPoint executeRequest(String rawRequest)
{
@ -111,7 +111,6 @@ public class LocalHttpConnector extends HttpConnector
protected void doStart() throws Exception
{
super.doStart();
_timer=new ScheduledThreadPoolExecutor(1);
_executor=new LocalExecutor(findExecutor());
}
@ -119,7 +118,6 @@ public class LocalHttpConnector extends HttpConnector
protected void doStop() throws Exception
{
super.doStop();
_timer.shutdownNow();
_executor=null;
}
@ -175,9 +173,8 @@ public class LocalHttpConnector extends HttpConnector
public LocalEndPoint()
{
super(_timer);
super(getScheduler(), LocalHttpConnector.this.getIdleTimeout());
setGrowOutput(true);
setIdleTimeout(LocalHttpConnector.this.getIdleTimeout());
}
public void addInput(String s)

View File

@ -50,8 +50,6 @@ import org.eclipse.jetty.server.Connector.NetConnector;
* thus if possible it should be read after the continuation or saved as a request attribute or as the
* associated object of the Continuation instance.
* </p>
*
* @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
*/
public class SelectChannelConnector extends HttpConnector implements NetConnector
{
@ -96,23 +94,16 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto
}
@Override
public void close()
public void close() throws IOException
{
synchronized(this)
{
if (_acceptChannel != null)
{
removeBean(_acceptChannel);
try
{
if (_acceptChannel.isOpen())
_acceptChannel.close();
}
catch(IOException e)
{
LOG.warn(e);
}
}
_acceptChannel = null;
_localPort=-2;
}
@ -182,7 +173,7 @@ public class SelectChannelConnector extends HttpConnector implements NetConnecto
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel,selectSet,key, this._idleTimeout);
return new SelectChannelEndPoint(channel,selectSet,key, getScheduler(), getIdleTimeout());
}
protected void endPointClosed(AsyncEndPoint endpoint)

View File

@ -55,7 +55,7 @@ public class NetworkTrafficSelectChannelConnector extends SelectChannelConnector
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.ManagedSelector selectSet, SelectionKey key) throws IOException
{
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getIdleTimeout(), listeners);
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
endPoint.notifyOpened();
return endPoint;
}

View File

@ -12,11 +12,9 @@ package org.eclipse.jetty.server;
//You may elect to redistribute this code under either of these licenses.
//========================================================================
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Timer;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.util.BufferUtil;
@ -26,6 +24,8 @@ import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class HttpWriterTest
{
private HttpWriter _writer;
@ -94,7 +94,7 @@ public class HttpWriterTest
}
@Override
public Timer getTimer()
public ScheduledExecutorService getScheduler()
{
return null;
}

View File

@ -13,11 +13,6 @@
package org.eclipse.jetty.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
@ -26,11 +21,9 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Locale;
import java.util.Timer;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
@ -52,6 +45,11 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
*
*/
@ -72,7 +70,7 @@ public class ResponseTest
_server.start();
_timer=new ScheduledThreadPoolExecutor(1);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer);
AsyncByteArrayEndPoint endp = new AsyncByteArrayEndPoint(_timer,5000);
HttpInput input = new HttpInput();
AsyncConnection connection = new AbstractAsyncConnection(endp,new Executor()
{
@ -114,7 +112,7 @@ public class ResponseTest
}
@Override
public Timer getTimer()
public ScheduledExecutorService getScheduler()
{
// TODO Auto-generated method stub
return null;

View File

@ -13,9 +13,6 @@
package org.eclipse.jetty.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@ -23,7 +20,6 @@ import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -40,12 +36,15 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SelectChannelStatisticsTest
{
private static final Logger LOG = Log.getLogger(SelectChannelStatisticsTest.class);
private static Server _server;
private static AbstractConnector _connector;
private static AbstractNetConnector _connector;
private static CyclicBarrier _connect;
private static CountDownLatch _closed;

View File

@ -41,53 +41,55 @@ import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
public class WebSocketClientFactory extends AggregateLifeCycle
{
private static final Logger LOG = Log.getLogger(WebSocketClientFactory.class);
/**
* Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
*/
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new StandardByteBufferPool();
private final Executor executor;
private final ScheduledExecutorService scheduler;
private final WebSocketClientSelectorManager selector;
private final EventMethodsCache methodsCache;
private final WebSocketPolicy policy;
public WebSocketClientFactory()
{
this(new QueuedThreadPool(),null);
this(new QueuedThreadPool());
}
public WebSocketClientFactory(Executor threadPool)
{
this(threadPool,null);
}
public WebSocketClientFactory(Executor executor, SslContextFactory sslContextFactory)
{
if (executor == null)
{
throw new IllegalArgumentException("Executor is required");
}
this.executor = executor;
addBean(executor);
if (sslContextFactory != null)
{
addBean(sslContextFactory);
}
this.policy = WebSocketPolicy.newClientPolicy();
selector = new WebSocketClientSelectorManager(bufferPool,executor,policy);
selector.setSslContextFactory(sslContextFactory);
addBean(selector);
this.methodsCache = new EventMethodsCache();
this(threadPool, Executors.newSingleThreadScheduledExecutor());
}
public WebSocketClientFactory(SslContextFactory sslContextFactory)
{
this(null,sslContextFactory);
this(new QueuedThreadPool(), Executors.newSingleThreadScheduledExecutor(), sslContextFactory);
}
public WebSocketClientFactory(Executor threadPool, ScheduledExecutorService scheduler)
{
this(threadPool, scheduler, null);
}
public WebSocketClientFactory(Executor executor, ScheduledExecutorService scheduler, SslContextFactory sslContextFactory)
{
if (executor == null)
throw new IllegalArgumentException("Executor is required");
this.executor = executor;
addBean(executor);
if (scheduler == null)
throw new IllegalArgumentException("Scheduler is required");
this.scheduler = scheduler;
if (sslContextFactory != null)
addBean(sslContextFactory);
this.policy = WebSocketPolicy.newClientPolicy();
selector = new WebSocketClientSelectorManager(bufferPool, executor, scheduler, policy);
selector.setSslContextFactory(sslContextFactory);
addBean(selector);
this.methodsCache = new EventMethodsCache();
}
private void closeConnections()

View File

@ -39,16 +39,18 @@ import org.eclipse.jetty.websocket.io.WebSocketAsyncConnection;
public class WebSocketClientSelectorManager extends SelectorManager
{
private SslContextFactory sslContextFactory;
private final Executor executor;
private final ScheduledExecutorService scheduler;
private final WebSocketPolicy policy;
private final ByteBufferPool bufferPool;
private SslContextFactory sslContextFactory;
public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, WebSocketPolicy policy)
public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy)
{
super();
this.bufferPool = bufferPool;
this.executor = executor;
this.scheduler = scheduler;
this.policy = policy;
}
@ -135,7 +137,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
return new SelectChannelEndPoint(channel,selectSet, selectionKey, policy.getIdleTimeout());
return new SelectChannelEndPoint(channel,selectSet, selectionKey, scheduler, policy.getIdleTimeout());
}
public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)

View File

@ -88,7 +88,7 @@
</New>
</Set>
<Call name="addLifeCycle">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.deploy.ContextDeployer">
<Set name="contexts"><Ref id="WebappContexts"/></Set>