Internal: assert that we do not call blocking code from transport threads.

This currently only adds checks to BaseFuture, but this should already cover
lots of client code. We could add more in the future, like interactions with
the filesystem and so on.

Close #9164
This commit is contained in:
Adrien Grand 2015-01-06 17:09:32 +01:00
parent 20f7be378b
commit fda727e20c
5 changed files with 108 additions and 11 deletions

View File

@ -20,7 +20,9 @@
package org.elasticsearch.common.util.concurrent;
import com.google.common.annotations.Beta;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.transport.Transports;
import java.util.concurrent.*;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@ -62,7 +64,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
* @author Sven Mawson
* @since 1.0
*/
// Same as AbstractFuture from Guava, but without the listeners
// Same as AbstractFuture from Guava, but without the listeners and with
// additional assertions
public abstract class BaseFuture<V> implements Future<V> {
/**
@ -89,6 +92,7 @@ public abstract class BaseFuture<V> implements Future<V> {
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
Transports.assertNotTransportThread("Blocking operation");
return sync.get(unit.toNanos(timeout));
}
@ -110,6 +114,7 @@ public abstract class BaseFuture<V> implements Future<V> {
*/
@Override
public V get() throws InterruptedException, ExecutionException {
Transports.assertNotTransportThread("Blocking operation");
return sync.get();
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.transport.netty.NettyTransport;
import java.util.Arrays;
public enum Transports {
;
/**
* Utility method to detect whether a thread is a network thread. Typically
* used in assertions to make sure that we do not call blocking code from
* networking threads.
*/
public static final boolean isTransportThread(Thread t) {
final String threadName = t.getName();
for (String s : Arrays.asList(
LocalTransport.LOCAL_TRANSPORT_THREAD_NAME_PREFIX,
NettyTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
NettyTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
NettyTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
NettyTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)) {
if (threadName.contains(s)) {
return true;
}
}
return false;
}
public static void assertTransportThread() {
final Thread t = Thread.currentThread();
assert isTransportThread(t) : "Expected transport thread but got [" + t + "]";
}
public static void assertNotTransportThread(String reason) {
final Thread t = Thread.currentThread();
assert isTransportThread(t) ==false : "Expected current thread [" + t + "] to not be a transport thread. Reason: ";
}
}

View File

@ -27,7 +27,12 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
@ -35,12 +40,27 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -52,6 +72,8 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
*/
public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport {
public static final String LOCAL_TRANSPORT_THREAD_NAME_PREFIX = "local_transport";
private final ThreadPool threadPool;
private final ThreadPoolExecutor workers;
private final Version version;
@ -75,7 +97,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
int workerCount = this.settings.getAsInt(TRANSPORT_LOCAL_WORKERS, EsExecutors.boundedNumberOfProcessors(settings));
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
this.workers = EsExecutors.newFixed(workerCount, queueSize, EsExecutors.daemonThreadFactory(this.settings, "local_transport"));
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX);
this.workers = EsExecutors.newFixed(workerCount, queueSize, threadFactory);
}
@Override
@ -210,6 +233,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
Transports.assertTransportThread();
try {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data, false);

View File

@ -65,6 +65,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Transports.assertTransportThread();
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
@ -98,6 +99,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
NettyUtils.setup();
}
public static final String HTTP_SERVER_WORKER_THREAD_NAME_PREFIX = "http_server_worker";
public static final String HTTP_SERVER_BOSS_THREAD_NAME_PREFIX = "http_server_boss";
public static final String TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX = "transport_client_worker";
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
public static final String WORKER_COUNT = "transport.netty.worker_count";
public static final String CONNECTIONS_PER_NODE_RECOVERY = "transport.connections_per_node.recovery";
public static final String CONNECTIONS_PER_NODE_BULK = "transport.connections_per_node.bulk";
@ -313,13 +319,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private ClientBootstrap createClientBootstrap() {
if (blockingClient) {
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker"))));
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
} else {
int bossCount = componentSettings.getAsInt("boss_count", 1);
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
bossCount,
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount),
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
}
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
@ -443,16 +449,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
final ThreadFactory bossFactory = daemonThreadFactory(this.settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX, name);
final ThreadFactory workerFactory = daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, name);
ServerBootstrap serverBootstrap;
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_boss", name)),
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_worker", name))
Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory)
));
} else {
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_boss", name)),
Executors.newCachedThreadPool(daemonThreadFactory(this.settings, "transport_server_worker", name)),
Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory),
workerCount));
}
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory(name, settings));