Bubble exceptions up in ClusterApplierService (#37729)

Exceptions thrown by the cluster applier service's settings and cluster appliers are bubbled up, and
block the state from being applied instead of silently being ignored. In combination with the cluster
state publishing lag detector, this will throw a node out of the cluster that can't properly apply
cluster state updates.
This commit is contained in:
Yannick Welsch 2019-01-24 14:09:03 +01:00 committed by GitHub
parent 9357929309
commit feab59df03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 234 additions and 145 deletions

View File

@ -390,31 +390,24 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
newClusterState = task.apply(previousClusterState);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
if (logger.isTraceEnabled()) {
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
executionTime,
previousClusterState.version(),
task.source,
previousClusterState.nodes(),
previousClusterState.routingTable(),
previousClusterState.getRoutingNodes()),
e);
}
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onFailure(task.source, e);
return;
}
if (previousClusterState == newClusterState) {
task.listener.onSuccess(task.source);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
} else {
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,
newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
}
try {
@ -424,20 +417,19 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
final String fullState = newClusterState.toString();
logger.warn(() -> new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime,
version,
stateUUID,
task.source,
fullState),
e);
// TODO: do we want to call updateTask.onFailure here?
if (logger.isTraceEnabled()) {
logger.warn(new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState), e);
} else {
logger.warn(new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
}
task.listener.onFailure(task.source, e);
}
}
}
@ -454,17 +446,14 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
}
}
logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
nodeConnectionsService.connectToNodes(newClusterState.nodes());
logger.debug("applying cluster state version {}", newClusterState.version());
try {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
clusterSettings.applySettings(incomingSettings);
}
} catch (Exception ex) {
logger.warn("failed to apply cluster settings", ex);
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
clusterSettings.applySettings(incomingSettings);
}
logger.debug("apply cluster state with version {}", newClusterState.version());
@ -476,18 +465,12 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
state.set(newClusterState);
callClusterStateListeners(clusterChangedEvent);
task.listener.onSuccess(task.source);
}
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
clusterStateAppliers.forEach(applier -> {
try {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
applier.applyClusterState(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateApplier", ex);
}
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
applier.applyClusterState(clusterChangedEvent);
});
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.ingest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
@ -69,6 +71,8 @@ public class IngestService implements ClusterStateApplier {
public static final String NOOP_PIPELINE_NAME = "_none";
private static final Logger logger = LogManager.getLogger(IngestService.class);
private final ClusterService clusterService;
private final ScriptService scriptService;
private final Map<String, Processor.Factory> processorFactories;
@ -256,7 +260,11 @@ public class IngestService implements ClusterStateApplier {
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
Map<String, Pipeline> originalPipelines = pipelines;
innerUpdatePipelines(event.previousState(), state);
try {
innerUpdatePipelines(event.previousState(), state);
} catch (ElasticsearchParseException e) {
logger.warn("failed to update ingest pipelines", e);
}
//pipelines changed, so add the old metrics to the new metrics
if (originalPipelines != pipelines) {
pipelines.forEach((id, pipeline) -> {

View File

@ -26,10 +26,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
@ -39,7 +39,9 @@ import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNo
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -52,6 +54,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
@ -931,7 +934,7 @@ public class CoordinatorTests extends ESTestCase {
cluster.runFor(defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING)
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "detecting disconnection");
assertThat(leader.clusterApplier.lastAppliedClusterState.blocks().global(), hasItem(expectedBlock));
assertThat(leader.getLastAppliedClusterState().blocks().global(), hasItem(expectedBlock));
// TODO reboot the leader and verify that the same block is applied when it restarts
}
@ -1525,12 +1528,12 @@ public class CoordinatorTests extends ESTestCase {
private Coordinator coordinator;
private final DiscoveryNode localNode;
private final MockPersistedState persistedState;
private FakeClusterApplier clusterApplier;
private AckedFakeThreadPoolMasterService masterService;
private DisruptableClusterApplierService clusterApplierService;
private ClusterService clusterService;
private TransportService transportService;
private DisruptableMockTransport mockTransport;
private List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();
private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
ClusterNode(int nodeIndex, boolean masterEligible) {
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier);
@ -1565,37 +1568,46 @@ public class CoordinatorTests extends ESTestCase {
final Settings settings = Settings.builder()
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterApplier = new FakeClusterApplier(settings, clusterSettings);
masterService = new AckedFakeThreadPoolMasterService("test_node", "test",
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
transportService = mockTransport.createTransportService(
settings, deterministicTaskQueue.getThreadPool(this::onNode), NOOP_TRANSPORT_INTERCEPTOR,
a -> localNode, null, emptySet());
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
deterministicTaskQueue, this::onNode);
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
clusterService.setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
transportService) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// override this method as it does blocking calls
}
});
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
Cluster.this::provideUnicastHosts, clusterApplier, onJoinValidators, Randomness.get());
Cluster.this::provideUnicastHosts, clusterApplierService, onJoinValidators, Randomness.get());
masterService.setClusterStatePublisher(coordinator);
logger.trace("starting up [{}]", localNode);
transportService.start();
transportService.acceptIncomingRequests();
masterService.start();
coordinator.start();
clusterService.start();
coordinator.startInitialJoin();
}
void close() {
logger.trace("taking down [{}]", localNode);
//transportService.stop(); // does blocking stuff :/
masterService.stop();
coordinator.stop();
//transportService.close(); // does blocking stuff :/
masterService.close();
clusterService.stop();
//transportService.stop(); // does blocking stuff :/
clusterService.close();
coordinator.close();
//transportService.close(); // does blocking stuff :/
}
ClusterNode restartedNode() {
@ -1634,11 +1646,11 @@ public class CoordinatorTests extends ESTestCase {
}
void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) {
this.clusterStateApplyResponse = clusterStateApplyResponse;
clusterApplierService.clusterStateApplyResponse = clusterStateApplyResponse;
}
ClusterStateApplyResponse getClusterStateApplyResponse() {
return clusterStateApplyResponse;
return clusterApplierService.clusterStateApplyResponse;
}
Runnable onNode(Runnable runnable) {
@ -1739,7 +1751,7 @@ public class CoordinatorTests extends ESTestCase {
}
ClusterState getLastAppliedClusterState() {
return clusterApplier.lastAppliedClusterState;
return clusterApplierService.state();
}
void applyInitialConfiguration() {
@ -1769,84 +1781,6 @@ public class CoordinatorTests extends ESTestCase {
private boolean isNotUsefullyBootstrapped() {
return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false;
}
private class FakeClusterApplier implements ClusterApplier {
final ClusterName clusterName;
private final ClusterSettings clusterSettings;
ClusterState lastAppliedClusterState;
private FakeClusterApplier(Settings settings, ClusterSettings clusterSettings) {
clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings = clusterSettings;
}
@Override
public void setInitialState(ClusterState initialState) {
assert lastAppliedClusterState == null;
assert initialState != null;
lastAppliedClusterState = initialState;
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
switch (clusterStateApplyResponse) {
case SUCCEED:
deterministicTaskQueue.scheduleNow(onNode(new Runnable() {
@Override
public void run() {
final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState;
final ClusterState newClusterState = clusterStateSupplier.get();
assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version "
+ oldClusterState.version() + " to stale version " + newClusterState.version();
clusterApplier.lastAppliedClusterState = newClusterState;
final Settings incomingSettings = newClusterState.metaData().settings();
clusterSettings.applySettings(incomingSettings); // TODO validation might throw exceptions here.
listener.onSuccess(source);
}
@Override
public String toString() {
return "apply cluster state from [" + source + "]";
}
}));
break;
case FAIL:
deterministicTaskQueue.scheduleNow(onNode(new Runnable() {
@Override
public void run() {
listener.onFailure(source, new ElasticsearchException("cluster state application failed"));
}
@Override
public String toString() {
return "fail to apply cluster state from [" + source + "]";
}
}));
break;
case HANG:
if (randomBoolean()) {
deterministicTaskQueue.scheduleNow(onNode(new Runnable() {
@Override
public void run() {
final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState;
final ClusterState newClusterState = clusterStateSupplier.get();
assert oldClusterState.version() <= newClusterState.version() :
"updating cluster state from version "
+ oldClusterState.version() + " to stale version " + newClusterState.version();
clusterApplier.lastAppliedClusterState = newClusterState;
}
@Override
public String toString() {
return "apply cluster state from [" + source + "] without ack";
}
}));
}
break;
}
}
}
}
private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
@ -1938,6 +1872,52 @@ public class CoordinatorTests extends ESTestCase {
}
}
static class DisruptableClusterApplierService extends ClusterApplierService {
private final String nodeName;
private final DeterministicTaskQueue deterministicTaskQueue;
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
this.nodeName = nodeName;
this.deterministicTaskQueue = deterministicTaskQueue;
addStateApplier(event -> {
switch (clusterStateApplyResponse) {
case SUCCEED:
case HANG:
final ClusterState oldClusterState = event.previousState();
final ClusterState newClusterState = event.state();
assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version "
+ oldClusterState.version() + " to stale version " + newClusterState.version();
break;
case FAIL:
throw new ElasticsearchException("simulated cluster state applier failure");
}
});
}
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
}
@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
if (clusterStateApplyResponse == ClusterStateApplyResponse.HANG) {
if (randomBoolean()) {
// apply cluster state, but don't notify listener
super.onNewClusterState(source, clusterStateSupplier, (source1, e) -> {
// ignore result
});
}
} else {
super.onNewClusterState(source, clusterStateSupplier, listener);
}
}
}
private static DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {
final TransportAddress address = buildNewFakeTransportAddress();
return new DiscoveryNode("", "node" + nodeIndex,

View File

@ -28,8 +28,10 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
@ -53,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
public class ClusterApplierServiceTests extends ESTestCase {
@ -357,6 +360,97 @@ public class ClusterApplierServiceTests extends ESTestCase {
assertTrue(applierCalled.get());
}
public void testClusterStateApplierBubblesUpExceptionsInApplier() throws InterruptedException {
AtomicReference<Throwable> error = new AtomicReference<>();
clusterApplierService.addStateApplier(event -> {
throw new RuntimeException("dummy exception");
});
CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
latch.countDown();
fail("should not be called");
}
@Override
public void onFailure(String source, Exception e) {
assertTrue(error.compareAndSet(null, e));
latch.countDown();
}
}
);
latch.await();
assertNotNull(error.get());
assertThat(error.get().getMessage(), containsString("dummy exception"));
}
public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws InterruptedException {
AtomicReference<Throwable> error = new AtomicReference<>();
clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
v -> {});
CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state())
.metaData(MetaData.builder(clusterApplierService.state().metaData())
.persistentSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), false).build())
.build())
.build(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
latch.countDown();
fail("should not be called");
}
@Override
public void onFailure(String source, Exception e) {
assertTrue(error.compareAndSet(null, e));
latch.countDown();
}
}
);
latch.await();
assertNotNull(error.get());
assertThat(error.get().getMessage(), containsString("illegal value can't update"));
}
public void testClusterStateApplierSwallowsExceptionInListener() throws InterruptedException {
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicBoolean applierCalled = new AtomicBoolean();
clusterApplierService.addListener(event -> {
assertTrue(applierCalled.compareAndSet(false, true));
throw new RuntimeException("dummy exception");
});
CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
error.compareAndSet(null, e);
}
}
);
latch.await();
assertNull(error.get());
assertTrue(applierCalled.get());
}
public void testClusterStateApplierCanCreateAnObserver() throws InterruptedException {
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicBoolean applierCalled = new AtomicBoolean();
@ -407,10 +501,12 @@ public class ClusterApplierServiceTests extends ESTestCase {
static class TimedClusterApplierService extends ClusterApplierService {
final ClusterSettings clusterSettings;
public volatile Long currentTimeOverride = null;
TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super("test_node", settings, clusterSettings, threadPool);
this.clusterSettings = clusterSettings;
}
@Override

View File

@ -19,6 +19,9 @@
package org.elasticsearch.ingest;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
@ -39,11 +42,13 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.CustomTypeSafeMatcher;
import org.mockito.ArgumentMatcher;
@ -254,7 +259,7 @@ public class IngestServiceTests extends ESTestCase {
assertThat(pipeline.getProcessors().size(), equalTo(0));
}
public void testPutWithErrorResponse() {
public void testPutWithErrorResponse() throws IllegalAccessException {
IngestService ingestService = createWithProcessors();
String id = "_id";
Pipeline pipeline = ingestService.getPipeline(id);
@ -265,11 +270,22 @@ public class IngestServiceTests extends ESTestCase {
new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON);
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test1",
IngestService.class.getCanonicalName(),
Level.WARN,
"failed to update ingest pipelines"));
Logger ingestLogger = LogManager.getLogger(IngestService.class);
Loggers.addAppender(ingestLogger, mockAppender);
try {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
fail("should fail");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(ingestLogger, mockAppender);
mockAppender.stop();
}
pipeline = ingestService.getPipeline(id);
assertNotNull(pipeline);

View File

@ -53,6 +53,12 @@ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecu
throw new KillWorkerError();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
// ensures we don't block
return false;
}
private static final class KillWorkerError extends Error {
}
}