assert blocking calls are not made on the cluster state update thread

This commit adds an assertion to ensure that we do not introduce blocking calls in code
that is called in a ClusterStateListener or another part of the cluster state update process.
This commit is contained in:
jaymode 2016-11-07 10:12:05 -05:00
parent 871a077ac4
commit 3cad92a631
No known key found for this signature in database
GPG Key ID: D859847567B3493D
3 changed files with 72 additions and 2 deletions

View File

@ -495,6 +495,13 @@ public class ClusterService extends AbstractLifecycleComponent {
return true; return true;
} }
/** asserts that the current thread is <b>NOT</b> the cluster state update thread */
public static boolean assertNotClusterStateUpdateThread(String reason) {
assert Thread.currentThread().getName().contains(UPDATE_THREAD_NAME) == false :
"Expected current thread [" + Thread.currentThread() + "] to not be the cluster state update thread. Reason: [" + reason + "]";
return true;
}
public ClusterName getClusterName() { public ClusterName getClusterName() {
return clusterName; return clusterName;
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.Transports;
@ -60,7 +61,9 @@ public abstract class BaseFuture<V> implements Future<V> {
public V get(long timeout, TimeUnit unit) throws InterruptedException, public V get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException { TimeoutException, ExecutionException {
assert timeout <= 0 || assert timeout <= 0 ||
(Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON)); (Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&
ClusterService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON));
return sync.get(unit.toNanos(timeout)); return sync.get(unit.toNanos(timeout));
} }
@ -82,7 +85,9 @@ public abstract class BaseFuture<V> implements Future<V> {
*/ */
@Override @Override
public V get() throws InterruptedException, ExecutionException { public V get() throws InterruptedException, ExecutionException {
assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON); assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&
ClusterService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON);
return sync.get(); return sync.get();
} }

View File

@ -41,6 +41,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -68,8 +69,10 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
@ -303,6 +306,61 @@ public class ClusterServiceTests extends ESTestCase {
assertTrue(published.get()); assertTrue(published.get());
} }
public void testBlockingCallInClusterStateTaskListenerFails() throws InterruptedException {
assumeTrue("assertions must be enabled for this test to work", BaseFuture.class.desiredAssertionStatus());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AssertionError> assertionRef = new AtomicReference<>();
clusterService.submitStateUpdateTask(
"testBlockingCallInClusterStateTaskListenerFails",
new Object(),
ClusterStateTaskConfig.build(Priority.NORMAL),
new ClusterStateTaskExecutor<Object>() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public BatchResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return BatchResult.builder().successes(tasks).build(newClusterState);
}
@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
assertNotNull(assertionRef.get());
}
},
new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
BaseFuture<Void> future = new BaseFuture<Void>() {};
try {
if (randomBoolean()) {
future.get(1L, TimeUnit.SECONDS);
} else {
future.get();
}
} catch (Exception e) {
throw new RuntimeException(e);
} catch (AssertionError e) {
assertionRef.set(e);
latch.countDown();
}
}
@Override
public void onFailure(String source, Exception e) {
}
}
);
latch.await();
assertNotNull(assertionRef.get());
assertThat(assertionRef.get().getMessage(), containsString("not be the cluster state update thread. Reason: [Blocking operation]"));
}
public void testOneExecutorDontStarveAnother() throws InterruptedException { public void testOneExecutorDontStarveAnother() throws InterruptedException {
final List<String> executionOrder = Collections.synchronizedList(new ArrayList<>()); final List<String> executionOrder = Collections.synchronizedList(new ArrayList<>());
final Semaphore allowProcessing = new Semaphore(0); final Semaphore allowProcessing = new Semaphore(0);