diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index a98f88cab20..ce5c0f3e258 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -539,6 +539,13 @@ public class ClusterService extends AbstractLifecycleComponent { return true; } + /** asserts that the current thread is NOT the cluster state update thread */ + public static boolean assertNotClusterStateUpdateThread(String reason) { + assert Thread.currentThread().getName().contains(UPDATE_THREAD_NAME) == false : + "Expected current thread [" + Thread.currentThread() + "] to not be the cluster state update thread. Reason: [" + reason + "]"; + return true; + } + public ClusterName getClusterName() { return clusterName; } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index c3e60ec5be3..ee9aea9ed70 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; @@ -60,7 +61,9 @@ public abstract class BaseFuture implements Future { public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { assert timeout <= 0 || - (Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON)); + (Transports.assertNotTransportThread(BLOCKING_OP_REASON) && + ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && + ClusterService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON)); return sync.get(unit.toNanos(timeout)); } @@ -82,7 +85,9 @@ public abstract class BaseFuture implements Future { */ @Override public V get() throws InterruptedException, ExecutionException { - assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON); + assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) && + ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && + ClusterService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON); return sync.get(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index 1ea6853ee7c..9fd4fc18514 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.test.ESTestCase; @@ -68,8 +69,10 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -303,6 +306,61 @@ public class ClusterServiceTests extends ESTestCase { assertTrue(published.get()); } + public void testBlockingCallInClusterStateTaskListenerFails() throws InterruptedException { + assumeTrue("assertions must be enabled for this test to work", BaseFuture.class.desiredAssertionStatus()); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference assertionRef = new AtomicReference<>(); + + clusterService.submitStateUpdateTask( + "testBlockingCallInClusterStateTaskListenerFails", + new Object(), + ClusterStateTaskConfig.build(Priority.NORMAL), + new ClusterStateTaskExecutor() { + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterState newClusterState = ClusterState.builder(currentState).build(); + return BatchResult.builder().successes(tasks).build(newClusterState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + assertNotNull(assertionRef.get()); + } + }, + new ClusterStateTaskListener() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + BaseFuture future = new BaseFuture() {}; + try { + if (randomBoolean()) { + future.get(1L, TimeUnit.SECONDS); + } else { + future.get(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } catch (AssertionError e) { + assertionRef.set(e); + latch.countDown(); + } + } + + @Override + public void onFailure(String source, Exception e) { + } + } + ); + + latch.await(); + assertNotNull(assertionRef.get()); + assertThat(assertionRef.get().getMessage(), containsString("not be the cluster state update thread. Reason: [Blocking operation]")); + } + public void testOneExecutorDontStarveAnother() throws InterruptedException { final List executionOrder = Collections.synchronizedList(new ArrayList<>()); final Semaphore allowProcessing = new Semaphore(0);