Guarding against NPE in case of concurrent stop().
This commit is contained in:
parent
55d229e73c
commit
9c43e04bb4
|
@ -4,11 +4,11 @@
|
|||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
// The Eclipse Public License is available at
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
|
||||
package org.eclipse.jetty.io.nio;
|
||||
|
@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
import org.eclipse.jetty.io.ConnectedEndPoint;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
|
@ -55,7 +54,7 @@ import org.eclipse.jetty.util.thread.Timeout.Task;
|
|||
public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
|
||||
{
|
||||
public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
|
||||
|
||||
|
||||
// TODO Tune these by approx system speed.
|
||||
private static final int __JVMBUG_THRESHHOLD=Integer.getInteger("org.eclipse.jetty.io.nio.JVMBUG_THRESHHOLD",0).intValue();
|
||||
private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue();
|
||||
|
@ -63,7 +62,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue();
|
||||
private static final int __BUSY_KEY=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_KEY",-1).intValue();
|
||||
private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue();
|
||||
|
||||
|
||||
private int _maxIdleTime;
|
||||
private int _lowResourcesMaxIdleTime;
|
||||
private long _lowResourcesConnections;
|
||||
|
@ -72,7 +71,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
private volatile int _set;
|
||||
private boolean _deferringInterestedOps0=true;
|
||||
private int _selectorPriorityDelta=0;
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
|
||||
|
@ -82,18 +81,18 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
_maxIdleTime=(int)maxIdleTime;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param selectSets number of select sets to create
|
||||
*/
|
||||
public void setSelectSets(int selectSets)
|
||||
{
|
||||
long lrc = _lowResourcesConnections * _selectSets;
|
||||
long lrc = _lowResourcesConnections * _selectSets;
|
||||
_selectSets=selectSets;
|
||||
_lowResourcesConnections=lrc/_selectSets;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the max idle time
|
||||
|
@ -102,7 +101,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
return _maxIdleTime;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the number of select sets in use
|
||||
|
@ -114,14 +113,14 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param i
|
||||
* @param i
|
||||
* @return The select set
|
||||
*/
|
||||
public SelectSet getSelectSet(int i)
|
||||
{
|
||||
return _selectSet[i];
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Register a channel
|
||||
* @param channel
|
||||
|
@ -132,8 +131,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
// The ++ increment here is not atomic, but it does not matter.
|
||||
// so long as the value changes sometimes, then connections will
|
||||
// be distributed over the available sets.
|
||||
|
||||
int s=_set++;
|
||||
|
||||
int s=_set++;
|
||||
s=s%_selectSets;
|
||||
SelectSet[] sets=_selectSet;
|
||||
if (sets!=null)
|
||||
|
@ -144,7 +143,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Register a channel
|
||||
* @param channel
|
||||
|
@ -154,8 +153,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
// The ++ increment here is not atomic, but it does not matter.
|
||||
// so long as the value changes sometimes, then connections will
|
||||
// be distributed over the available sets.
|
||||
|
||||
int s=_set++;
|
||||
|
||||
int s=_set++;
|
||||
s=s%_selectSets;
|
||||
SelectSet[] sets=_selectSet;
|
||||
if (sets!=null)
|
||||
|
@ -165,14 +164,14 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
set.wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** Register a {@link ServerSocketChannel}
|
||||
* @param acceptChannel
|
||||
*/
|
||||
public void register(ServerSocketChannel acceptChannel)
|
||||
{
|
||||
int s=_set++;
|
||||
int s=_set++;
|
||||
s=s%_selectSets;
|
||||
SelectSet set=_selectSet[s];
|
||||
set.addChange(acceptChannel);
|
||||
|
@ -196,8 +195,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
_selectorPriorityDelta=delta;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @return the lowResourcesConnections
|
||||
|
@ -237,7 +236,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
_lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------------------------- */
|
||||
public abstract boolean dispatch(Runnable task);
|
||||
|
@ -254,7 +253,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_selectSet[i]= new SelectSet(i);
|
||||
|
||||
super.doStart();
|
||||
|
||||
|
||||
// start a thread to Select
|
||||
for (int i=0;i<getSelectSets();i++)
|
||||
{
|
||||
|
@ -271,7 +270,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
if (sets==null)
|
||||
return;
|
||||
SelectSet set=sets[id];
|
||||
|
||||
|
||||
Thread.currentThread().setName(name+" Selector"+id);
|
||||
if (getSelectorPriorityDelta()!=0)
|
||||
Thread.currentThread().setPriority(Thread.currentThread().getPriority()+getSelectorPriorityDelta());
|
||||
|
@ -362,7 +361,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
LOG.warn(ex+","+channel+","+attachment);
|
||||
LOG.debug(ex);
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public String dump()
|
||||
{
|
||||
|
@ -375,8 +374,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
out.append(String.valueOf(this)).append("\n");
|
||||
AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------------------------- */
|
||||
/* ------------------------------------------------------------------------------- */
|
||||
/* ------------------------------------------------------------------------------- */
|
||||
|
@ -384,11 +383,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
private final int _setID;
|
||||
private final Timeout _timeout;
|
||||
|
||||
|
||||
private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>();
|
||||
|
||||
|
||||
private volatile Selector _selector;
|
||||
|
||||
|
||||
private volatile Thread _selecting;
|
||||
private int _jvmBug;
|
||||
private int _selects;
|
||||
|
@ -404,7 +403,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
private int _jvmFix2;
|
||||
private volatile long _idleTick;
|
||||
private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
SelectSet(int acceptorID) throws Exception
|
||||
{
|
||||
|
@ -420,7 +419,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_monitorNext=_monitorStart+__MONITOR_PERIOD;
|
||||
_log=_monitorStart+60000;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void addChange(Object change)
|
||||
{
|
||||
|
@ -429,7 +428,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void addChange(SelectableChannel channel, Object att)
|
||||
{
|
||||
{
|
||||
if (att==null)
|
||||
addChange(channel);
|
||||
else if (att instanceof EndPoint)
|
||||
|
@ -437,11 +436,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
else
|
||||
addChange(new ChannelAndAttachment(channel,att));
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* Select and dispatch tasks found from changes and the selector.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void doSelect() throws IOException
|
||||
|
@ -450,6 +449,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
_selecting=Thread.currentThread();
|
||||
final Selector selector=_selector;
|
||||
// Stopped concurrently ?
|
||||
if (selector == null)
|
||||
return;
|
||||
|
||||
// Make any key changes required
|
||||
Object change;
|
||||
|
@ -458,7 +460,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
Channel ch=null;
|
||||
SelectionKey key=null;
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
if (change instanceof EndPoint)
|
||||
|
@ -475,7 +477,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
final SelectableChannel channel=asc._channel;
|
||||
ch=channel;
|
||||
final Object att = asc._attachment;
|
||||
|
||||
|
||||
if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected())
|
||||
{
|
||||
key = channel.register(selector,SelectionKey.OP_READ,att);
|
||||
|
@ -517,7 +519,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
if (e instanceof ThreadDeath)
|
||||
throw (ThreadDeath)e;
|
||||
|
||||
|
||||
if (isRunning())
|
||||
LOG.warn(e);
|
||||
else
|
||||
|
@ -540,7 +542,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_selects++;
|
||||
|
||||
long now=System.currentTimeMillis();
|
||||
|
||||
|
||||
// if no immediate things to do
|
||||
if (selected==0 && selector.selectedKeys().isEmpty())
|
||||
{
|
||||
|
@ -562,7 +564,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_timeout.setNow(now);
|
||||
long to_next_timeout=_timeout.getTimeToNext();
|
||||
|
||||
long wait = _changes.size()==0?__IDLE_TICK:0L;
|
||||
long wait = _changes.size()==0?__IDLE_TICK:0L;
|
||||
if (wait > 0 && to_next_timeout >= 0 && wait > to_next_timeout)
|
||||
wait = to_next_timeout;
|
||||
|
||||
|
@ -574,21 +576,21 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_selects++;
|
||||
now = System.currentTimeMillis();
|
||||
_timeout.setNow(now);
|
||||
|
||||
|
||||
if (__JVMBUG_THRESHHOLD>0)
|
||||
checkJvmBugs(before, now, wait, selected);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// have we been destroyed while sleeping
|
||||
if (_selector==null || !selector.isOpen())
|
||||
return;
|
||||
|
||||
// Look for things to do
|
||||
for (SelectionKey key: selector.selectedKeys())
|
||||
{
|
||||
{
|
||||
SocketChannel channel=null;
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
if (!key.isValid())
|
||||
|
@ -641,7 +643,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
SelectChannelEndPoint endpoint = createEndPoint(channel,key);
|
||||
key.attach(endpoint);
|
||||
if (key.isReadable())
|
||||
endpoint.schedule();
|
||||
endpoint.schedule();
|
||||
}
|
||||
key = null;
|
||||
}
|
||||
|
@ -665,15 +667,15 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
LOG.debug(e2);
|
||||
}
|
||||
|
||||
|
||||
if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
|
||||
key.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Everything always handled
|
||||
selector.selectedKeys().clear();
|
||||
|
||||
|
||||
now=System.currentTimeMillis();
|
||||
_timeout.setNow(now);
|
||||
Task task = _timeout.expired();
|
||||
|
@ -688,11 +690,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
if (now-_idleTick>__IDLE_TICK)
|
||||
{
|
||||
_idleTick=now;
|
||||
|
||||
|
||||
final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections))
|
||||
?(now+_maxIdleTime-_lowResourcesMaxIdleTime)
|
||||
:now;
|
||||
|
||||
|
||||
dispatch(new Runnable()
|
||||
{
|
||||
public void run()
|
||||
|
@ -721,7 +723,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_selecting=null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
private void checkJvmBugs(long before, long now, long wait, int selected)
|
||||
throws IOException
|
||||
|
@ -729,7 +731,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
Selector selector = _selector;
|
||||
if (selector==null)
|
||||
return;
|
||||
|
||||
|
||||
// Look for JVM bugs over a monitor period.
|
||||
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
|
||||
// http://bugs.sun.com/view_bug.do?bug_id=6693490
|
||||
|
@ -748,7 +750,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
if (now>_log)
|
||||
{
|
||||
if (_paused>0)
|
||||
if (_paused>0)
|
||||
LOG.debug(this+" Busy selector - injecting delay "+_paused+" times");
|
||||
|
||||
if (_jvmFix2>0)
|
||||
|
@ -842,9 +844,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
_busyKey=busy;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
private void renewSelector()
|
||||
private void renewSelector()
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -876,7 +878,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
throw new RuntimeException("recreating selector",e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public SelectorManager getManager()
|
||||
{
|
||||
|
@ -891,9 +893,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
|
||||
/* ------------------------------------------------------------ */
|
||||
/**
|
||||
* @param task The task to timeout. If it implements Runnable, then
|
||||
* @param task The task to timeout. If it implements Runnable, then
|
||||
* expired will be called from a dispatched thread.
|
||||
*
|
||||
*
|
||||
* @param timeoutMs
|
||||
*/
|
||||
public void scheduleTimeout(Timeout.Task task, long timeoutMs)
|
||||
|
@ -902,7 +904,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
throw new IllegalArgumentException("!Runnable");
|
||||
_timeout.schedule(task, timeoutMs);
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void cancelTimeout(Timeout.Task task)
|
||||
{
|
||||
|
@ -927,20 +929,20 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
renewSelector();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
renewSelector();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException
|
||||
{
|
||||
SelectChannelEndPoint endp = newEndPoint(channel,this,sKey);
|
||||
endPointOpened(endp);
|
||||
endPointOpened(endp);
|
||||
_endPoints.put(endp,this);
|
||||
return endp;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
public void destroyEndPoint(SelectChannelEndPoint endp)
|
||||
{
|
||||
|
@ -953,11 +955,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
return _selector;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
void stop() throws Exception
|
||||
{
|
||||
// Spin for a while waiting for selector to complete
|
||||
// Spin for a while waiting for selector to complete
|
||||
// to avoid unneccessary closed channel exceptions
|
||||
try
|
||||
{
|
||||
|
@ -994,8 +996,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
_timeout.cancelAll();
|
||||
try
|
||||
{
|
||||
|
@ -1006,7 +1008,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
catch (IOException e)
|
||||
{
|
||||
LOG.ignore(e);
|
||||
}
|
||||
}
|
||||
_selector=null;
|
||||
}
|
||||
}
|
||||
|
@ -1021,9 +1023,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n");
|
||||
|
||||
|
||||
Thread selecting = _selecting;
|
||||
|
||||
|
||||
Object where = "not selecting";
|
||||
StackTraceElement[] trace =selecting==null?null:selecting.getStackTrace();
|
||||
if (trace!=null)
|
||||
|
@ -1039,9 +1041,9 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
Selector selector=_selector;
|
||||
final ArrayList<Object> dump = new ArrayList<Object>(selector.keys().size()*2);
|
||||
dump.add(where);
|
||||
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
|
||||
addChange(new Runnable(){
|
||||
public void run()
|
||||
{
|
||||
|
@ -1049,7 +1051,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
latch.await(5,TimeUnit.SECONDS);
|
||||
|
@ -1081,7 +1083,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
final SelectableChannel _channel;
|
||||
final Object _attachment;
|
||||
|
||||
|
||||
public ChannelAndAttachment(SelectableChannel channel, Object attachment)
|
||||
{
|
||||
super();
|
||||
|
@ -1101,12 +1103,12 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
|
|||
{
|
||||
_deferringInterestedOps0 = deferringInterestedOps0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/* ------------------------------------------------------------ */
|
||||
/* ------------------------------------------------------------ */
|
||||
private interface ChangeTask extends Runnable
|
||||
{}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue