assert blocking calls are not made on the cluster state update thread

This commit adds an assertion to ensure that we do not introduce blocking calls in code
that is called in a ClusterStateListener or another part of the cluster state update process.
This commit is contained in:
Jay Modi 2016-11-14 14:30:01 -05:00 committed by GitHub
commit 87d76c3ff8
3 changed files with 72 additions and 2 deletions

View File

@ -539,6 +539,13 @@ public class ClusterService extends AbstractLifecycleComponent {
return true;
}
/** asserts that the current thread is <b>NOT</b> the cluster state update thread */
public static boolean assertNotClusterStateUpdateThread(String reason) {
assert Thread.currentThread().getName().contains(UPDATE_THREAD_NAME) == false :
"Expected current thread [" + Thread.currentThread() + "] to not be the cluster state update thread. Reason: [" + reason + "]";
return true;
}
public ClusterName getClusterName() {
return clusterName;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;
@ -60,7 +61,9 @@ public abstract class BaseFuture<V> implements Future<V> {
public V get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
assert timeout <= 0 ||
(Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON));
(Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&
ClusterService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON));
return sync.get(unit.toNanos(timeout));
}
@ -82,7 +85,9 @@ public abstract class BaseFuture<V> implements Future<V> {
*/
@Override
public V get() throws InterruptedException, ExecutionException {
assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON);
assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&
ClusterService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON);
return sync.get();
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.ESTestCase;
@ -68,8 +69,10 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@ -303,6 +306,61 @@ public class ClusterServiceTests extends ESTestCase {
assertTrue(published.get());
}
public void testBlockingCallInClusterStateTaskListenerFails() throws InterruptedException {
assumeTrue("assertions must be enabled for this test to work", BaseFuture.class.desiredAssertionStatus());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AssertionError> assertionRef = new AtomicReference<>();
clusterService.submitStateUpdateTask(
"testBlockingCallInClusterStateTaskListenerFails",
new Object(),
ClusterStateTaskConfig.build(Priority.NORMAL),
new ClusterStateTaskExecutor<Object>() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public BatchResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return BatchResult.builder().successes(tasks).build(newClusterState);
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
assertNotNull(assertionRef.get());
}
},
new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
BaseFuture<Void> future = new BaseFuture<Void>() {};
try {
if (randomBoolean()) {
future.get(1L, TimeUnit.SECONDS);
} else {
future.get();
}
} catch (Exception e) {
throw new RuntimeException(e);
} catch (AssertionError e) {
assertionRef.set(e);
latch.countDown();
}
}
@Override
public void onFailure(String source, Exception e) {
}
}
);
latch.await();
assertNotNull(assertionRef.get());
assertThat(assertionRef.get().getMessage(), containsString("not be the cluster state update thread. Reason: [Blocking operation]"));
}
public void testOneExecutorDontStarveAnother() throws InterruptedException {
final List<String> executionOrder = Collections.synchronizedList(new ArrayList<>());
final Semaphore allowProcessing = new Semaphore(0);