[Transport] Introduced worker threads to prevent alien threads of entering a node.

Requests are handled by the worked thread pool of the target node instead of the generic thread pool of the source node.
Also this change is required in order to make GC disruption work with local transport. Previously the handling of the a request was performed on on a node that that was being GC disrupted, resulting in some actions being performed while GC was being simulated.
This commit is contained in:
Boaz Leskes 2014-07-31 16:59:20 +02:00
parent 966a55d21c
commit 26d90882e5
2 changed files with 24 additions and 6 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus; import org.elasticsearch.transport.support.TransportStatus;
@ -40,6 +41,8 @@ import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
@ -50,6 +53,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport { public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ThreadPoolExecutor workers;
private final Version version; private final Version version;
private volatile TransportServiceAdapter transportServiceAdapter; private volatile TransportServiceAdapter transportServiceAdapter;
private volatile BoundTransportAddress boundAddress; private volatile BoundTransportAddress boundAddress;
@ -58,13 +62,20 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap(); private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local_address"; public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address";
public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers";
public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue";
@Inject @Inject
public LocalTransport(Settings settings, ThreadPool threadPool, Version version) { public LocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.version = version; this.version = version;
int workerCount = this.settings.getAsInt(TRANSPORT_LOCAL_WORKERS, EsExecutors.boundedNumberOfProcessors(settings));
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
this.workers = EsExecutors.newFixed(workerCount, queueSize, EsExecutors.daemonThreadFactory(this.settings, "local_transport"));
} }
@Override @Override
@ -106,6 +117,13 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override @Override
protected void doClose() throws ElasticsearchException { protected void doClose() throws ElasticsearchException {
workers.shutdown();
try {
workers.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
workers.shutdownNow();
} }
@Override @Override
@ -185,7 +203,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
transportServiceAdapter.sent(data.length); transportServiceAdapter.sent(data.length);
threadPool.generic().execute(new Runnable() { targetTransport.workers().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId); targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId);
@ -193,8 +211,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}); });
} }
ThreadPool threadPool() { ThreadPoolExecutor workers() {
return this.threadPool; return this.workers;
} }
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) { protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {

View File

@ -72,7 +72,7 @@ public class LocalTransportChannel implements TransportChannel {
response.writeTo(stream); response.writeTo(stream);
stream.close(); stream.close();
final byte[] data = bStream.bytes().toBytes(); final byte[] data = bStream.bytes().toBytes();
targetTransport.threadPool().generic().execute(new Runnable() { targetTransport.workers().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
targetTransport.messageReceived(data, action, sourceTransport, version, null); targetTransport.messageReceived(data, action, sourceTransport, version, null);
@ -98,7 +98,7 @@ public class LocalTransportChannel implements TransportChannel {
too.close(); too.close();
} }
final byte[] data = stream.bytes().toBytes(); final byte[] data = stream.bytes().toBytes();
targetTransport.threadPool().generic().execute(new Runnable() { targetTransport.workers().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
targetTransport.messageReceived(data, action, sourceTransport, version, null); targetTransport.messageReceived(data, action, sourceTransport, version, null);