* Issue #1939 fair selector choice Signed-off-by: Greg Wilkins <gregw@webtide.com> * Issue #1939 removed address based heuristic for choosing selector Signed-off-by: Greg Wilkins <gregw@webtide.com> * Issue #1939 removed println Signed-off-by: Greg Wilkins <gregw@webtide.com> * Issue #1939 use lambda syntax sugar Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
0ef51b67dc
commit
6fb7c9ec2e
|
@ -20,7 +20,6 @@ package org.eclipse.jetty.io;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.channels.SelectableChannel;
|
import java.nio.channels.SelectableChannel;
|
||||||
|
@ -29,6 +28,8 @@ import java.nio.channels.Selector;
|
||||||
import java.nio.channels.ServerSocketChannel;
|
import java.nio.channels.ServerSocketChannel;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.IntUnaryOperator;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||||
|
@ -58,8 +59,9 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
|
||||||
private final Executor executor;
|
private final Executor executor;
|
||||||
private final Scheduler scheduler;
|
private final Scheduler scheduler;
|
||||||
private final ManagedSelector[] _selectors;
|
private final ManagedSelector[] _selectors;
|
||||||
|
private final AtomicInteger _selectorIndex = new AtomicInteger();
|
||||||
|
private final IntUnaryOperator _selectorIndexUpdate;
|
||||||
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
|
||||||
private long _selectorIndex;
|
|
||||||
private int _reservedThreads = -1;
|
private int _reservedThreads = -1;
|
||||||
private ThreadPoolBudget.Lease _lease;
|
private ThreadPoolBudget.Lease _lease;
|
||||||
|
|
||||||
|
@ -92,6 +94,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
_selectors = new ManagedSelector[selectors];
|
_selectors = new ManagedSelector[selectors];
|
||||||
|
_selectorIndexUpdate = index -> (index+1)%_selectors.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ManagedAttribute("The Executor")
|
@ManagedAttribute("The Executor")
|
||||||
|
@ -179,46 +182,7 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
|
||||||
|
|
||||||
private ManagedSelector chooseSelector(SelectableChannel channel)
|
private ManagedSelector chooseSelector(SelectableChannel channel)
|
||||||
{
|
{
|
||||||
// Ideally we would like to have all connections from the same client end
|
return _selectors[_selectorIndex.updateAndGet(_selectorIndexUpdate)];
|
||||||
// up on the same selector (to try to avoid smearing the data from a single
|
|
||||||
// client over all cores), but because of proxies, the remote address may not
|
|
||||||
// really be the client - so we have to hedge our bets to ensure that all
|
|
||||||
// channels don't end up on the one selector for a proxy.
|
|
||||||
ManagedSelector candidate1 = null;
|
|
||||||
if (channel != null)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (channel instanceof SocketChannel)
|
|
||||||
{
|
|
||||||
SocketAddress remote = ((SocketChannel)channel).getRemoteAddress();
|
|
||||||
if (remote instanceof InetSocketAddress)
|
|
||||||
{
|
|
||||||
byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
|
|
||||||
if (addr != null)
|
|
||||||
{
|
|
||||||
int s = addr[addr.length - 1] & 0xFF;
|
|
||||||
candidate1 = _selectors[s % getSelectorCount()];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (IOException x)
|
|
||||||
{
|
|
||||||
LOG.ignore(x);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The ++ increment here is not atomic, but it does not matter,
|
|
||||||
// so long as the value changes sometimes, then connections will
|
|
||||||
// be distributed over the available selectors.
|
|
||||||
long s = _selectorIndex++;
|
|
||||||
int index = (int)(s % getSelectorCount());
|
|
||||||
ManagedSelector candidate2 = _selectors[index];
|
|
||||||
|
|
||||||
if (candidate1 == null || candidate1.size() >= candidate2.size() * 2)
|
|
||||||
return candidate2;
|
|
||||||
return candidate1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue