From feab59df039d99a177bee4e4583be0cd8346c2c1 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 24 Jan 2019 14:09:03 +0100 Subject: [PATCH] Bubble exceptions up in ClusterApplierService (#37729) Exceptions thrown by the cluster applier service's settings and cluster appliers are bubbled up, and block the state from being applied instead of silently being ignored. In combination with the cluster state publishing lag detector, this will throw a node out of the cluster that can't properly apply cluster state updates. --- .../service/ClusterApplierService.java | 69 +++---- .../elasticsearch/ingest/IngestService.java | 10 +- .../coordination/CoordinatorTests.java | 174 ++++++++---------- .../service/ClusterApplierServiceTests.java | 96 ++++++++++ .../ingest/IngestServiceTests.java | 24 ++- .../MockSinglePrioritizingExecutor.java | 6 + 6 files changed, 234 insertions(+), 145 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index fa3d4997efb..496ee9040a8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -390,31 +390,24 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements newClusterState = task.apply(previousClusterState); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); - if (logger.isTraceEnabled()) { - logger.trace(() -> new ParameterizedMessage( - "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}", - executionTime, - previousClusterState.version(), - task.source, - previousClusterState.nodes(), - previousClusterState.routingTable(), - previousClusterState.getRoutingNodes()), - e); - } + logger.trace(() -> new ParameterizedMessage( + "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}", + executionTime, previousClusterState.version(), task.source, previousClusterState), e); warnAboutSlowTaskIfNeeded(executionTime, task.source); task.listener.onFailure(task.source, e); return; } if (previousClusterState == newClusterState) { - task.listener.onSuccess(task.source); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime); warnAboutSlowTaskIfNeeded(executionTime, task.source); + task.listener.onSuccess(task.source); } else { if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState); - } else if (logger.isDebugEnabled()) { + logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source, + newClusterState); + } else { logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source); } try { @@ -424,20 +417,19 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements executionTime, newClusterState.version(), newClusterState.stateUUID()); warnAboutSlowTaskIfNeeded(executionTime, task.source); + task.listener.onSuccess(task.source); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); - final long version = newClusterState.version(); - final String stateUUID = newClusterState.stateUUID(); - final String fullState = newClusterState.toString(); - logger.warn(() -> new ParameterizedMessage( - "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", - executionTime, - version, - stateUUID, - task.source, - fullState), - e); - // TODO: do we want to call updateTask.onFailure here? + if (logger.isTraceEnabled()) { + logger.warn(new ParameterizedMessage( + "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", + executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState), e); + } else { + logger.warn(new ParameterizedMessage( + "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]", + executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e); + } + task.listener.onFailure(task.source, e); } } } @@ -454,17 +446,14 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements } } + logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version()); nodeConnectionsService.connectToNodes(newClusterState.nodes()); - logger.debug("applying cluster state version {}", newClusterState.version()); - try { - // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency - if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { - final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); - clusterSettings.applySettings(incomingSettings); - } - } catch (Exception ex) { - logger.warn("failed to apply cluster settings", ex); + // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency + if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { + logger.debug("applying settings from cluster state with version {}", newClusterState.version()); + final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); + clusterSettings.applySettings(incomingSettings); } logger.debug("apply cluster state with version {}", newClusterState.version()); @@ -476,18 +465,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements state.set(newClusterState); callClusterStateListeners(clusterChangedEvent); - - task.listener.onSuccess(task.source); } private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) { clusterStateAppliers.forEach(applier -> { - try { - logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); - applier.applyClusterState(clusterChangedEvent); - } catch (Exception ex) { - logger.warn("failed to notify ClusterStateApplier", ex); - } + logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); + applier.applyClusterState(clusterChangedEvent); }); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 6951e33d5e7..00b04bff2e5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -69,6 +71,8 @@ public class IngestService implements ClusterStateApplier { public static final String NOOP_PIPELINE_NAME = "_none"; + private static final Logger logger = LogManager.getLogger(IngestService.class); + private final ClusterService clusterService; private final ScriptService scriptService; private final Map processorFactories; @@ -256,7 +260,11 @@ public class IngestService implements ClusterStateApplier { public void applyClusterState(final ClusterChangedEvent event) { ClusterState state = event.state(); Map originalPipelines = pipelines; - innerUpdatePipelines(event.previousState(), state); + try { + innerUpdatePipelines(event.previousState(), state); + } catch (ElasticsearchParseException e) { + logger.warn("failed to update ingest pipelines", e); + } //pipelines changed, so add the old metrics to the new metrics if (originalPipelines != pipelines) { pipelines.forEach((id, pipeline) -> { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 7db63ab120e..be40f0c8883 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -26,10 +26,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterModule; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; @@ -39,7 +39,9 @@ import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNo import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; -import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -52,6 +54,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; @@ -931,7 +934,7 @@ public class CoordinatorTests extends ESTestCase { cluster.runFor(defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "detecting disconnection"); - assertThat(leader.clusterApplier.lastAppliedClusterState.blocks().global(), hasItem(expectedBlock)); + assertThat(leader.getLastAppliedClusterState().blocks().global(), hasItem(expectedBlock)); // TODO reboot the leader and verify that the same block is applied when it restarts } @@ -1525,12 +1528,12 @@ public class CoordinatorTests extends ESTestCase { private Coordinator coordinator; private final DiscoveryNode localNode; private final MockPersistedState persistedState; - private FakeClusterApplier clusterApplier; private AckedFakeThreadPoolMasterService masterService; + private DisruptableClusterApplierService clusterApplierService; + private ClusterService clusterService; private TransportService transportService; private DisruptableMockTransport mockTransport; private List> extraJoinValidators = new ArrayList<>(); - private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; ClusterNode(int nodeIndex, boolean masterEligible) { this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier); @@ -1565,37 +1568,46 @@ public class CoordinatorTests extends ESTestCase { final Settings settings = Settings.builder() .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap - - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - clusterApplier = new FakeClusterApplier(settings, clusterSettings); - masterService = new AckedFakeThreadPoolMasterService("test_node", "test", - runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable))); transportService = mockTransport.createTransportService( settings, deterministicTaskQueue.getThreadPool(this::onNode), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); + masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", + runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable))); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings, + deterministicTaskQueue, this::onNode); + clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); + clusterService.setNodeConnectionsService( + new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode), + transportService) { + @Override + public void connectToNodes(DiscoveryNodes discoveryNodes) { + // override this method as it does blocking calls + } + }); final Collection> onJoinValidators = Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, - Cluster.this::provideUnicastHosts, clusterApplier, onJoinValidators, Randomness.get()); + Cluster.this::provideUnicastHosts, clusterApplierService, onJoinValidators, Randomness.get()); masterService.setClusterStatePublisher(coordinator); logger.trace("starting up [{}]", localNode); transportService.start(); transportService.acceptIncomingRequests(); - masterService.start(); coordinator.start(); + clusterService.start(); coordinator.startInitialJoin(); } void close() { logger.trace("taking down [{}]", localNode); - //transportService.stop(); // does blocking stuff :/ - masterService.stop(); coordinator.stop(); - //transportService.close(); // does blocking stuff :/ - masterService.close(); + clusterService.stop(); + //transportService.stop(); // does blocking stuff :/ + clusterService.close(); coordinator.close(); + //transportService.close(); // does blocking stuff :/ } ClusterNode restartedNode() { @@ -1634,11 +1646,11 @@ public class CoordinatorTests extends ESTestCase { } void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) { - this.clusterStateApplyResponse = clusterStateApplyResponse; + clusterApplierService.clusterStateApplyResponse = clusterStateApplyResponse; } ClusterStateApplyResponse getClusterStateApplyResponse() { - return clusterStateApplyResponse; + return clusterApplierService.clusterStateApplyResponse; } Runnable onNode(Runnable runnable) { @@ -1739,7 +1751,7 @@ public class CoordinatorTests extends ESTestCase { } ClusterState getLastAppliedClusterState() { - return clusterApplier.lastAppliedClusterState; + return clusterApplierService.state(); } void applyInitialConfiguration() { @@ -1769,84 +1781,6 @@ public class CoordinatorTests extends ESTestCase { private boolean isNotUsefullyBootstrapped() { return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false; } - - private class FakeClusterApplier implements ClusterApplier { - - final ClusterName clusterName; - private final ClusterSettings clusterSettings; - ClusterState lastAppliedClusterState; - - private FakeClusterApplier(Settings settings, ClusterSettings clusterSettings) { - clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - this.clusterSettings = clusterSettings; - } - - @Override - public void setInitialState(ClusterState initialState) { - assert lastAppliedClusterState == null; - assert initialState != null; - lastAppliedClusterState = initialState; - } - - @Override - public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { - switch (clusterStateApplyResponse) { - case SUCCEED: - deterministicTaskQueue.scheduleNow(onNode(new Runnable() { - @Override - public void run() { - final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState; - final ClusterState newClusterState = clusterStateSupplier.get(); - assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version " - + oldClusterState.version() + " to stale version " + newClusterState.version(); - clusterApplier.lastAppliedClusterState = newClusterState; - final Settings incomingSettings = newClusterState.metaData().settings(); - clusterSettings.applySettings(incomingSettings); // TODO validation might throw exceptions here. - listener.onSuccess(source); - } - - @Override - public String toString() { - return "apply cluster state from [" + source + "]"; - } - })); - break; - case FAIL: - deterministicTaskQueue.scheduleNow(onNode(new Runnable() { - @Override - public void run() { - listener.onFailure(source, new ElasticsearchException("cluster state application failed")); - } - - @Override - public String toString() { - return "fail to apply cluster state from [" + source + "]"; - } - })); - break; - case HANG: - if (randomBoolean()) { - deterministicTaskQueue.scheduleNow(onNode(new Runnable() { - @Override - public void run() { - final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState; - final ClusterState newClusterState = clusterStateSupplier.get(); - assert oldClusterState.version() <= newClusterState.version() : - "updating cluster state from version " - + oldClusterState.version() + " to stale version " + newClusterState.version(); - clusterApplier.lastAppliedClusterState = newClusterState; - } - - @Override - public String toString() { - return "apply cluster state from [" + source + "] without ack"; - } - })); - } - break; - } - } - } } private List provideUnicastHosts(HostsResolver ignored) { @@ -1938,6 +1872,52 @@ public class CoordinatorTests extends ESTestCase { } } + static class DisruptableClusterApplierService extends ClusterApplierService { + private final String nodeName; + private final DeterministicTaskQueue deterministicTaskQueue; + ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; + + DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, + DeterministicTaskQueue deterministicTaskQueue, Function runnableWrapper) { + super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper)); + this.nodeName = nodeName; + this.deterministicTaskQueue = deterministicTaskQueue; + addStateApplier(event -> { + switch (clusterStateApplyResponse) { + case SUCCEED: + case HANG: + final ClusterState oldClusterState = event.previousState(); + final ClusterState newClusterState = event.state(); + assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version " + + oldClusterState.version() + " to stale version " + newClusterState.version(); + break; + case FAIL: + throw new ElasticsearchException("simulated cluster state applier failure"); + } + }); + } + + @Override + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue); + } + + @Override + public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { + if (clusterStateApplyResponse == ClusterStateApplyResponse.HANG) { + if (randomBoolean()) { + // apply cluster state, but don't notify listener + super.onNewClusterState(source, clusterStateSupplier, (source1, e) -> { + // ignore result + }); + } + } else { + super.onNewClusterState(source, clusterStateSupplier, listener); + } + } + + } + private static DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) { final TransportAddress address = buildNewFakeTransportAddress(); return new DiscoveryNode("", "node" + nodeIndex, diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 2690909489c..770ae68e128 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -28,8 +28,10 @@ import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; @@ -53,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; public class ClusterApplierServiceTests extends ESTestCase { @@ -357,6 +360,97 @@ public class ClusterApplierServiceTests extends ESTestCase { assertTrue(applierCalled.get()); } + public void testClusterStateApplierBubblesUpExceptionsInApplier() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + clusterApplierService.addStateApplier(event -> { + throw new RuntimeException("dummy exception"); + }); + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), + new ClusterApplyListener() { + + @Override + public void onSuccess(String source) { + latch.countDown(); + fail("should not be called"); + } + + @Override + public void onFailure(String source, Exception e) { + assertTrue(error.compareAndSet(null, e)); + latch.countDown(); + } + } + ); + + latch.await(); + assertNotNull(error.get()); + assertThat(error.get().getMessage(), containsString("dummy exception")); + } + + public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, + v -> {}); + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()) + .metaData(MetaData.builder(clusterApplierService.state().metaData()) + .persistentSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), false).build()) + .build()) + .build(), + new ClusterApplyListener() { + + @Override + public void onSuccess(String source) { + latch.countDown(); + fail("should not be called"); + } + + @Override + public void onFailure(String source, Exception e) { + assertTrue(error.compareAndSet(null, e)); + latch.countDown(); + } + } + ); + + latch.await(); + assertNotNull(error.get()); + assertThat(error.get().getMessage(), containsString("illegal value can't update")); + } + + public void testClusterStateApplierSwallowsExceptionInListener() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + AtomicBoolean applierCalled = new AtomicBoolean(); + clusterApplierService.addListener(event -> { + assertTrue(applierCalled.compareAndSet(false, true)); + throw new RuntimeException("dummy exception"); + }); + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), + new ClusterApplyListener() { + + @Override + public void onSuccess(String source) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Exception e) { + error.compareAndSet(null, e); + } + } + ); + + latch.await(); + assertNull(error.get()); + assertTrue(applierCalled.get()); + } + public void testClusterStateApplierCanCreateAnObserver() throws InterruptedException { AtomicReference error = new AtomicReference<>(); AtomicBoolean applierCalled = new AtomicBoolean(); @@ -407,10 +501,12 @@ public class ClusterApplierServiceTests extends ESTestCase { static class TimedClusterApplierService extends ClusterApplierService { + final ClusterSettings clusterSettings; public volatile Long currentTimeOverride = null; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { super("test_node", settings, clusterSettings, threadPool); + this.clusterSettings = clusterSettings; } @Override diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3dde7babb0a..e5aea1f5d5c 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; @@ -39,11 +42,13 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.CustomTypeSafeMatcher; import org.mockito.ArgumentMatcher; @@ -254,7 +259,7 @@ public class IngestServiceTests extends ESTestCase { assertThat(pipeline.getProcessors().size(), equalTo(0)); } - public void testPutWithErrorResponse() { + public void testPutWithErrorResponse() throws IllegalAccessException { IngestService ingestService = createWithProcessors(); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); @@ -265,11 +270,22 @@ public class IngestServiceTests extends ESTestCase { new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1", + IngestService.class.getCanonicalName(), + Level.WARN, + "failed to update ingest pipelines")); + Logger ingestLogger = LogManager.getLogger(IngestService.class); + Loggers.addAppender(ingestLogger, mockAppender); try { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - fail("should fail"); - } catch (ElasticsearchParseException e) { - assertThat(e.getMessage(), equalTo("[processors] required property is missing")); + mockAppender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(ingestLogger, mockAppender); + mockAppender.stop(); } pipeline = ingestService.getPipeline(id); assertNotNull(pipeline); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java index cc21fef5f55..bcc10f1521b 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java @@ -53,6 +53,12 @@ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecu throw new KillWorkerError(); } + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + // ensures we don't block + return false; + } + private static final class KillWorkerError extends Error { } }