From e122505a9131a9fb23a7923927911bf3058e7636 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 13 Aug 2018 18:03:08 +0200 Subject: [PATCH] Zen2: Deterministic MasterService (#32493) Increases testability of MasterService and the discovery layer. Changes: - Async publish method - Moved a few interfaces/classes top-level to simplify imports - Deterministic MasterService implementation for tests --- .../elasticsearch/ElasticsearchException.java | 9 +- .../master/TransportMasterNodeAction.java | 4 +- .../action/shard/ShardStateAction.java | 6 +- .../coordination/ClusterStatePublisher.java | 58 +++++++ .../FailedToCommitClusterStateException.java | 43 +++++ .../cluster/coordination/Publication.java | 5 +- .../cluster/service/ClusterService.java | 16 +- .../cluster/service/MasterService.java | 138 ++++++++++------ .../common/util/concurrent/BaseFuture.java | 15 +- .../PrioritizedEsThreadPoolExecutor.java | 2 +- .../elasticsearch/discovery/Discovery.java | 55 +------ .../discovery/single/SingleNodeDiscovery.java | 23 +-- .../zen/PublishClusterStateAction.java | 15 +- .../discovery/zen/ZenDiscovery.java | 43 +++-- .../org/elasticsearch/gateway/Gateway.java | 2 +- .../ExceptionSerializationTests.java | 2 +- .../TransportMasterNodeActionTests.java | 4 +- .../TransportReplicationActionTests.java | 8 +- .../cluster/MinimumMasterNodesIT.java | 4 +- .../action/shard/ShardStateActionTests.java | 4 +- .../cluster/service/ClusterSerivceTests.java | 2 +- .../cluster/service/MasterServiceTests.java | 21 +-- .../discovery/SnapshotDisruptionIT.java | 3 +- .../zen/PublishClusterStateActionTests.java | 9 +- .../discovery/zen/ZenDiscoveryUnitTests.java | 73 ++++++--- .../cluster/FakeThreadPoolMasterService.java | 149 ++++++++++++++++++ .../FakeThreadPoolMasterServiceTests.java | 130 +++++++++++++++ .../org/elasticsearch/test/NoopDiscovery.java | 5 +- .../test/ClusterServiceUtils.java | 31 ++-- 29 files changed, 636 insertions(+), 243 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/ClusterStatePublisher.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/coordination/FailedToCommitClusterStateException.java create mode 100644 server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java create mode 100644 server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 01c24acccdd..0c5ee331515 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -21,7 +21,6 @@ package org.elasticsearch; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; @@ -1006,8 +1005,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte UNKNOWN_VERSION_ADDED), TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class, org.elasticsearch.indices.TypeMissingException::new, 137, UNKNOWN_VERSION_ADDED), - FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class, - org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED), + FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException.class, + org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED), QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141, UNKNOWN_VERSION_ADDED), NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class, @@ -1026,8 +1025,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte org.elasticsearch.common.xcontent.UnknownNamedObjectException::new, 148, Version.V_5_2_0), TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class, MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1), - COORDINATION_STATE_REJECTED_EXCEPTION(CoordinationStateRejectedException.class, - CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1); + COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class, + org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1); final Class exceptionClass; diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 934241a8fcb..8e68bc00bf2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -38,7 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -174,7 +174,7 @@ public abstract class TransportMasterNodeAction new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t); retry(t, masterChangePredicate); diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 0949e47cd05..34844cd941b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -49,7 +49,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -132,7 +132,7 @@ public class ShardStateAction extends AbstractComponent { private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{ NotMasterException.class, ConnectTransportException.class, - Discovery.FailedToCommitClusterStateException.class + FailedToCommitClusterStateException.class }; private static boolean isMasterChannelException(TransportException exp) { @@ -625,7 +625,7 @@ public class ShardStateAction extends AbstractComponent { * are: * - {@link NotMasterException} * - {@link NodeDisconnectedException} - * - {@link Discovery.FailedToCommitClusterStateException} + * - {@link FailedToCommitClusterStateException} * * Any other exception is communicated to the requester via * this notification. diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterStatePublisher.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterStatePublisher.java new file mode 100644 index 00000000000..91f9beacd5c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterStatePublisher.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.TimeValue; + +public interface ClusterStatePublisher { + /** + * Publish all the changes to the cluster from the master (can be called just by the master). The publish + * process should apply this state to the master as well! + * + * The publishListener allows to wait for the publication to complete, which can be either successful completion, timing out or failing. + * The method is guaranteed to pass back a {@link FailedToCommitClusterStateException} to the publishListener if the change is not + * committed and should be rejected. Any other exception signals that something bad happened but the change is committed. + * + * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether + * they updated their own cluster state or not. + */ + void publish(ClusterChangedEvent clusterChangedEvent, ActionListener publishListener, AckListener ackListener); + + interface AckListener { + /** + * Should be called when the cluster coordination layer has committed the cluster state (i.e. even if this publication fails, + * it is guaranteed to appear in future publications). + * @param commitTime the time it took to commit the cluster state + */ + void onCommit(TimeValue commitTime); + + /** + * Should be called whenever the cluster coordination layer receives confirmation from a node that it has successfully applied + * the cluster state. In case of failures, an exception should be provided as parameter. + * @param node the node + * @param e the optional exception + */ + void onNodeAck(DiscoveryNode node, @Nullable Exception e); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FailedToCommitClusterStateException.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FailedToCommitClusterStateException.java new file mode 100644 index 00000000000..1f7eb128e94 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FailedToCommitClusterStateException.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * Thrown when failing to publish a cluster state. See {@link ClusterStatePublisher} for more details. + */ +public class FailedToCommitClusterStateException extends ElasticsearchException { + + public FailedToCommitClusterStateException(StreamInput in) throws IOException { + super(in); + } + + public FailedToCommitClusterStateException(String msg, Object... args) { + super(msg, args); + } + + public FailedToCommitClusterStateException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 3fef7415739..d2b29aa999b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -22,12 +22,11 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.Discovery.AckListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; @@ -145,7 +144,7 @@ public abstract class Publication extends AbstractComponent { if (isPublishQuorum(possiblySuccessfulNodes) == false) { logger.debug("onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed", possiblySuccessfulNodes, this); - Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum"); + Exception e = new FailedToCommitClusterStateException("non-failed nodes do not form a quorum"); publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); onPossibleCompletion(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 7610d75f677..df7f20ca357 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -59,25 +59,31 @@ public class ClusterService extends AbstractLifecycleComponent { private final OperationRouting operationRouting; private final ClusterSettings clusterSettings; - private final Map> initialClusterStateCustoms; public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Map> initialClusterStateCustoms) { + this(settings, clusterSettings, new MasterService(settings, threadPool), + new ClusterApplierService(settings, clusterSettings, threadPool, + () -> ClusterService.newClusterStateBuilder(settings, initialClusterStateCustoms))); + } + + public ClusterService(Settings settings, ClusterSettings clusterSettings, + MasterService masterService, ClusterApplierService clusterApplierService) { super(settings); - this.masterService = new MasterService(settings, threadPool); + this.masterService = masterService; this.operationRouting = new OperationRouting(settings, clusterSettings); this.clusterSettings = clusterSettings; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold); - this.initialClusterStateCustoms = initialClusterStateCustoms; - this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder); + this.clusterApplierService = clusterApplierService; } /** * Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs. */ - public ClusterState.Builder newClusterStateBuilder() { + private static ClusterState.Builder newClusterStateBuilder(Settings settings, + Map> initialClusterStateCustoms) { ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)); for (Map.Entry> entry : initialClusterStateCustoms.entrySet()) { builder.putCustom(entry.getKey(), entry.getValue().get()); diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 8927adfd434..59e4fc38522 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.service; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Assertions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.AckedClusterStateTaskListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -48,7 +49,9 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; @@ -59,7 +62,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -70,7 +72,7 @@ public class MasterService extends AbstractLifecycleComponent { public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask"; - private BiConsumer clusterStatePublisher; + protected ClusterStatePublisher clusterStatePublisher; private java.util.function.Supplier clusterStateSupplier; @@ -92,7 +94,7 @@ public class MasterService extends AbstractLifecycleComponent { this.slowTaskLoggingThreshold = slowTaskLoggingThreshold; } - public synchronized void setClusterStatePublisher(BiConsumer publisher) { + public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) { clusterStatePublisher = publisher; } @@ -104,12 +106,16 @@ public class MasterService extends AbstractLifecycleComponent { protected synchronized void doStart() { Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting"); - threadPoolExecutor = EsExecutors.newSinglePrioritizing( + threadPoolExecutor = createThreadPoolExecutor(); + taskBatcher = new Batcher(logger, threadPoolExecutor); + } + + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return EsExecutors.newSinglePrioritizing( nodeName() + "/" + MASTER_UPDATE_THREAD_NAME, daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME), threadPool.getThreadContext(), threadPool.scheduler()); - taskBatcher = new Batcher(logger, threadPoolExecutor); } class Batcher extends TaskBatcher { @@ -166,14 +172,17 @@ public class MasterService extends AbstractLifecycleComponent { return clusterStateSupplier.get(); } + private static boolean isMasterUpdateThread() { + return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME); + } + public static boolean assertMasterUpdateThread() { - assert Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) : - "not called from the master service thread"; + assert isMasterUpdateThread() : "not called from the master service thread"; return true; } public static boolean assertNotMasterUpdateThread(String reason) { - assert Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) == false : + assert isMasterUpdateThread() == false : "Expected current thread [" + Thread.currentThread() + "] to not be the master service thread. Reason: [" + reason + "]"; return true; } @@ -222,48 +231,77 @@ public class MasterService extends AbstractLifecycleComponent { } logger.debug("publishing cluster state version [{}]", newClusterState.version()); - try { - clusterStatePublisher.accept(clusterChangedEvent, taskOutputs.createAckListener(threadPool, newClusterState)); - } catch (Discovery.FailedToCommitClusterStateException t) { - final long version = newClusterState.version(); - logger.warn(() -> new ParameterizedMessage( - "failing [{}]: failed to commit cluster state version [{}]", summary, version), t); - taskOutputs.publishingFailed(t); - return; - } - - taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState); - - try { - taskOutputs.clusterStatePublished(clusterChangedEvent); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage( - "exception thrown while notifying executor of new cluster state publication [{}]", - summary), e); - } - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); - logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})", summary, - executionTime, newClusterState.version(), - newClusterState.stateUUID()); - warnAboutSlowTaskIfNeeded(executionTime, summary); + publish(clusterChangedEvent, taskOutputs, startTimeNS); } catch (Exception e) { - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); - final long version = newClusterState.version(); - final String stateUUID = newClusterState.stateUUID(); - final String fullState = newClusterState.toString(); - logger.warn(() -> new ParameterizedMessage( - "failed to publish updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", - executionTime, - version, - stateUUID, - summary, - fullState), - e); - // TODO: do we want to call updateTask.onFailure here? + handleException(summary, startTimeNS, newClusterState, e); } } } + protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) { + final PlainActionFuture fut = new PlainActionFuture() { + @Override + protected boolean blockingAllowed() { + return isMasterUpdateThread() || super.blockingAllowed(); + } + }; + clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); + + // indefinitely wait for publication to complete + try { + FutureUtils.get(fut); + onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS); + } catch (Exception e) { + onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e); + } + } + + protected void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) { + taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state()); + + try { + taskOutputs.clusterStatePublished(clusterChangedEvent); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage( + "exception thrown while notifying executor of new cluster state publication [{}]", + clusterChangedEvent.source()), e); + } + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); + logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})", + clusterChangedEvent.source(), + executionTime, clusterChangedEvent.state().version(), + clusterChangedEvent.state().stateUUID()); + warnAboutSlowTaskIfNeeded(executionTime, clusterChangedEvent.source()); + } + + protected void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS, + Exception exception) { + if (exception instanceof FailedToCommitClusterStateException) { + final long version = clusterChangedEvent.state().version(); + logger.warn(() -> new ParameterizedMessage( + "failing [{}]: failed to commit cluster state version [{}]", clusterChangedEvent.source(), version), exception); + taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception); + } else { + handleException(clusterChangedEvent.source(), startTimeNS, clusterChangedEvent.state(), exception); + } + } + + private void handleException(String summary, long startTimeNS, ClusterState newClusterState, Exception e) { + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); + final long version = newClusterState.version(); + final String stateUUID = newClusterState.stateUUID(); + final String fullState = newClusterState.toString(); + logger.warn(() -> new ParameterizedMessage( + "failed to publish updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", + executionTime, + version, + stateUUID, + summary, + fullState), + e); + // TODO: do we want to call updateTask.onFailure here? + } + public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) { ClusterTasksResult clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState); ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult); @@ -276,7 +314,7 @@ public class MasterService extends AbstractLifecycleComponent { if (previousClusterState != newClusterState) { // only the master controls the version numbers - Builder builder = ClusterState.builder(newClusterState).incrementVersion(); + Builder builder = incrementVersion(newClusterState); if (previousClusterState.routingTable() != newClusterState.routingTable()) { builder.routingTable(RoutingTable.builder(newClusterState.routingTable()) .version(newClusterState.routingTable().version() + 1).build()); @@ -291,6 +329,10 @@ public class MasterService extends AbstractLifecycleComponent { return newClusterState; } + protected Builder incrementVersion(ClusterState clusterState) { + return ClusterState.builder(clusterState).incrementVersion(); + } + /** * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, * ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched. @@ -335,7 +377,7 @@ public class MasterService extends AbstractLifecycleComponent { /** * Output created by executing a set of tasks provided as TaskInputs */ - class TaskOutputs { + protected class TaskOutputs { public final TaskInputs taskInputs; public final ClusterState previousClusterState; public final ClusterState newClusterState; @@ -353,7 +395,7 @@ public class MasterService extends AbstractLifecycleComponent { this.executionResults = executionResults; } - public void publishingFailed(Discovery.FailedToCommitClusterStateException t) { + public void publishingFailed(FailedToCommitClusterStateException t) { nonFailedTasks.forEach(task -> task.listener.onFailure(task.source(), t)); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index 3436ccdf7ad..2a379717f38 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -61,11 +61,7 @@ public abstract class BaseFuture implements Future { @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { - assert timeout <= 0 || - (Transports.assertNotTransportThread(BLOCKING_OP_REASON) && - ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && - ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) && - MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON)); + assert timeout <= 0 || blockingAllowed(); return sync.get(unit.toNanos(timeout)); } @@ -87,11 +83,16 @@ public abstract class BaseFuture implements Future { */ @Override public V get() throws InterruptedException, ExecutionException { - assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) && + assert blockingAllowed(); + return sync.get(); + } + + // protected so that it can be overridden in specific instances + protected boolean blockingAllowed() { + return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) && MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); - return sync.get(); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 8f9245ad583..d1157efe77a 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -48,7 +48,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { private final Queue current = ConcurrentCollections.newQueue(); private final ScheduledExecutorService timer; - PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + public PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder); this.timer = timer; diff --git a/server/src/main/java/org/elasticsearch/discovery/Discovery.java b/server/src/main/java/org/elasticsearch/discovery/Discovery.java index b58f61bac89..e71f1ac846d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -19,66 +19,15 @@ package org.elasticsearch.discovery; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.unit.TimeValue; - -import java.io.IOException; /** * A pluggable module allowing to implement discovery of other nodes, publishing of the cluster * state to all nodes, electing a master of the cluster that raises cluster state change * events. */ -public interface Discovery extends LifecycleComponent { - - /** - * Publish all the changes to the cluster from the master (can be called just by the master). The publish - * process should apply this state to the master as well! - * - * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether - * they updated their own cluster state or not. - * - * The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be rejected. - * Any other exception signals the something wrong happened but the change is committed. - */ - void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener); - - interface AckListener { - /** - * Should be called when the discovery layer has committed the clusters state (i.e. even if this publication fails, - * it is guaranteed to appear in future publications). - * @param commitTime the time it took to commit the cluster state - */ - void onCommit(TimeValue commitTime); - - /** - * Should be called whenever the discovery layer receives confirmation from a node that it has successfully applied - * the cluster state. In case of failures, an exception should be provided as parameter. - * @param node the node - * @param e the optional exception - */ - void onNodeAck(DiscoveryNode node, @Nullable Exception e); - } - - class FailedToCommitClusterStateException extends ElasticsearchException { - - public FailedToCommitClusterStateException(StreamInput in) throws IOException { - super(in); - } - - public FailedToCommitClusterStateException(String msg, Object... args) { - super(msg, args); - } - - public FailedToCommitClusterStateException(String msg, Throwable cause, Object... args) { - super(msg, cause, args); - } - } +public interface Discovery extends LifecycleComponent, ClusterStatePublisher { /** * @return stats about the discovery diff --git a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java index d7c37febb5d..84718186824 100644 --- a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.single; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -35,9 +36,7 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Objects; -import java.util.concurrent.CountDownLatch; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -59,33 +58,25 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D } @Override - public synchronized void publish(final ClusterChangedEvent event, + public synchronized void publish(final ClusterChangedEvent event, ActionListener publishListener, final AckListener ackListener) { clusterState = event.state(); ackListener.onCommit(TimeValue.ZERO); - CountDownLatch latch = new CountDownLatch(1); - ClusterApplyListener listener = new ClusterApplyListener() { + clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", () -> clusterState, new ClusterApplyListener() { @Override public void onSuccess(String source) { - latch.countDown(); + publishListener.onResponse(null); ackListener.onNodeAck(transportService.getLocalNode(), null); } @Override public void onFailure(String source, Exception e) { - latch.countDown(); + publishListener.onFailure(e); ackListener.onNodeAck(transportService.getLocalNode(), e); logger.warn(() -> new ParameterizedMessage("failed while applying cluster state locally [{}]", event.source()), e); } - }; - clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", () -> clusterState, listener); - - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + }); } @Override @@ -129,7 +120,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D } @Override - protected void doClose() throws IOException { + protected void doClose() { } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java b/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index 5e9f960e893..9595a38ea34 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -45,6 +45,7 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BytesTransportRequest; @@ -117,12 +118,12 @@ public class PublishClusterStateAction extends AbstractComponent { * publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will * be processed by the master and the other nodes. *

- * The method is guaranteed to throw a {@link org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException} + * The method is guaranteed to throw a {@link FailedToCommitClusterStateException} * if the change is not committed and should be rejected. * Any other exception signals the something wrong happened but the change is committed. */ public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes, - final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException { + final Discovery.AckListener ackListener) throws FailedToCommitClusterStateException { final DiscoveryNodes nodes; final SendingController sendingController; final Set nodesToPublishTo; @@ -155,19 +156,19 @@ public class PublishClusterStateAction extends AbstractComponent { sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler); } catch (Exception e) { - throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", e); + throw new FailedToCommitClusterStateException("unexpected error while preparing to publish", e); } try { innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates, serializedDiffs); - } catch (Discovery.FailedToCommitClusterStateException t) { + } catch (FailedToCommitClusterStateException t) { throw t; } catch (Exception e) { // try to fail committing, in cause it's still on going if (sendingController.markAsFailed("unexpected error", e)) { // signal the change should be rejected - throw new Discovery.FailedToCommitClusterStateException("unexpected error", e); + throw new FailedToCommitClusterStateException("unexpected error", e); } else { throw e; } @@ -518,7 +519,7 @@ public class PublishClusterStateAction extends AbstractComponent { this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes this.pendingMasterNodes = totalMasterNodes - 1; if (this.neededMastersToCommit > this.pendingMasterNodes) { - throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state." + + throw new FailedToCommitClusterStateException("not enough masters to ack sent cluster state." + "[{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); } this.committed = neededMastersToCommit == 0; @@ -537,7 +538,7 @@ public class PublishClusterStateAction extends AbstractComponent { markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])"); } if (isCommitted() == false) { - throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left", + throw new FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left", timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index eb9a9f8d488..02415f8824d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -21,7 +21,6 @@ package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -52,9 +51,11 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -74,7 +75,6 @@ import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -322,18 +322,19 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } @Override - public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) { + public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener publishListener, AckListener ackListener) { ClusterState newState = clusterChangedEvent.state(); assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source(); - // state got changed locally (maybe because another master published to us) - if (clusterChangedEvent.previousState() != this.committedState.get()) { - throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update"); - } - - pendingStatesQueue.addPending(newState); - try { + + // state got changed locally (maybe because another master published to us) + if (clusterChangedEvent.previousState() != this.committedState.get()) { + throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update"); + } + + pendingStatesQueue.addPending(newState); + publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); } catch (FailedToCommitClusterStateException t) { // cluster service logs a WARN message @@ -346,25 +347,26 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover rejoin("zen-disco-failed-to-publish"); } - throw t; + + publishListener.onFailure(t); + return; } final DiscoveryNode localNode = newState.getNodes().getLocalNode(); - final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean processedOrFailed = new AtomicBoolean(); pendingStatesQueue.markAsCommitted(newState.stateUUID(), new PendingClusterStatesQueue.StateProcessedListener() { @Override public void onNewClusterStateProcessed() { processedOrFailed.set(true); - latch.countDown(); + publishListener.onResponse(null); ackListener.onNodeAck(localNode, null); } @Override public void onNewClusterStateFailed(Exception e) { processedOrFailed.set(true); - latch.countDown(); + publishListener.onFailure(e); ackListener.onNodeAck(localNode, e); logger.warn(() -> new ParameterizedMessage( "failed while applying cluster state locally [{}]", clusterChangedEvent.source()), e); @@ -373,7 +375,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover synchronized (stateMutex) { if (clusterChangedEvent.previousState() != this.committedState.get()) { - throw new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes"); + publishListener.onFailure(new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes")); + return; } boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() + @@ -382,17 +385,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover assert false : "cluster state published locally neither processed nor failed: " + newState; logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed", newState.version()); - return; + publishListener.onFailure(new FailedToCommitClusterStateException("cluster state that is published locally has neither been processed nor failed")); } } - // indefinitely wait for cluster state to be applied locally - try { - latch.await(); - } catch (InterruptedException e) { - logger.debug(() -> new ParameterizedMessage( - "interrupted while applying cluster state locally [{}]", clusterChangedEvent.source()), e); - Thread.currentThread().interrupt(); - } } /** diff --git a/server/src/main/java/org/elasticsearch/gateway/Gateway.java b/server/src/main/java/org/elasticsearch/gateway/Gateway.java index ae8f5a85def..0a6e54ea6e1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/server/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -147,7 +147,7 @@ public class Gateway extends AbstractComponent { metaDataBuilder.transientSettings(), e -> logUnknownSetting("transient", e), (e, ex) -> logInvalidSetting("transient", e, ex))); - ClusterState.Builder builder = clusterService.newClusterStateBuilder(); + ClusterState.Builder builder = clusterService.getClusterApplierService().newClusterStateBuilder(); builder.metaData(metaDataBuilder); listener.onSuccess(builder.build()); } diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 091c5846748..669d1b33c07 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -801,7 +801,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(137, org.elasticsearch.indices.TypeMissingException.class); ids.put(138, null); ids.put(139, null); - ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class); + ids.put(140, org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException.class); ids.put(141, org.elasticsearch.index.query.QueryShardException.class); ids.put(142, ShardStateAction.NoLongerPrimaryShardException.class); ids.put(143, org.elasticsearch.script.ScriptException.class); diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index b27bc9ad794..1eda5d4c545 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -40,7 +40,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; @@ -419,7 +419,7 @@ public class TransportMasterNodeActionTests extends ESTestCase { // The other node has become master, simulate failures of this node while publishing cluster state through ZenDiscovery setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); Exception failure = randomBoolean() - ? new Discovery.FailedToCommitClusterStateException("Fake error") + ? new FailedToCommitClusterStateException("Fake error") : new NotMasterException("Fake error"); listener.onFailure(failure); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 08301e99d6a..1b3ffc274b4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; @@ -371,8 +372,11 @@ public class TransportReplicationActionTests extends ESTestCase { public void testClosedIndexOnReroute() throws InterruptedException { final String index = "test"; // no replicas in oder to skip the replication part - setState(clusterService, new ClusterStateChanges(xContentRegistry(), threadPool).closeIndices(state(index, true, - ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index))); + ClusterStateChanges clusterStateChanges = new ClusterStateChanges(xContentRegistry(), threadPool); + setState(clusterService, clusterStateChanges.closeIndices( + clusterStateChanges.createIndex(clusterService.state(), new CreateIndexRequest(index)), + new CloseIndexRequest(index))); + assertThat(clusterService.state().metaData().indices().get(index).getState(), equalTo(IndexMetaData.State.CLOSE)); logger.debug("--> using initial state:\n{}", clusterService.state()); Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms"); PlainActionFuture listener = new PlainActionFuture<>(); diff --git a/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 31ffb026e3a..f13a8ffd6ea 100644 --- a/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -25,8 +25,8 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.index.query.QueryBuilders; @@ -401,7 +401,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { logger.debug("--> waiting for cluster state to be processed/rejected"); latch.await(); - assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class)); + assertThat(failure.get(), instanceOf(FailedToCommitClusterStateException.class)); assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue())); partition.stopDisrupting(); diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 1d78cdeb983..846236d89f8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -40,7 +40,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -243,7 +243,7 @@ public class ShardStateActionTests extends ESTestCase { if (randomBoolean()) { transport.handleRemoteError( requestId, - randomFrom(new NotMasterException("simulated"), new Discovery.FailedToCommitClusterStateException("simulated"))); + randomFrom(new NotMasterException("simulated"), new FailedToCommitClusterStateException("simulated"))); } else { if (randomBoolean()) { transport.handleLocalError(requestId, new NodeNotConnectedException(null, "simulated")); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java index 2cebd41a52c..acafaf1dd8d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterSerivceTests.java @@ -62,7 +62,7 @@ public class ClusterSerivceTests extends ESTestCase { ClusterService service = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.singletonMap("foo", () -> custom)); - ClusterState.Builder builder = service.newClusterStateBuilder(); + ClusterState.Builder builder = service.getClusterApplierService().newClusterStateBuilder(); assertSame(builder.build().custom("foo"), custom); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 1ef548bd681..26944112f08 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -42,7 +42,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -71,7 +72,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -122,7 +122,10 @@ public class MasterServiceTests extends ESTestCase { .masterNodeId(makeMaster ? localNode.getId() : null)) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - timedMasterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state())); + timedMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + clusterStateRef.set(event.state()); + publishListener.onResponse(null); + }); timedMasterService.setClusterStateSupplier(clusterStateRef::get); timedMasterService.start(); return timedMasterService; @@ -789,18 +792,17 @@ public class MasterServiceTests extends ESTestCase { .localNodeId(node1.getId()) .masterNodeId(node1.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build(); - final AtomicReference> publisherRef = new AtomicReference<>(); - timedMasterService.setClusterStatePublisher((cce, l) -> publisherRef.get().accept(cce, l)); + final AtomicReference publisherRef = new AtomicReference<>(); + timedMasterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al)); timedMasterService.setClusterStateSupplier(() -> initialClusterState); timedMasterService.start(); - // check that we don't time out before even committing the cluster state { final CountDownLatch latch = new CountDownLatch(1); - publisherRef.set((clusterChangedEvent, ackListener) -> { - throw new Discovery.FailedToCommitClusterStateException("mock exception"); + publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> { + publishListener.onFailure(new FailedToCommitClusterStateException("mock exception")); }); timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { @@ -850,7 +852,8 @@ public class MasterServiceTests extends ESTestCase { final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100)); - publisherRef.set((clusterChangedEvent, ackListener) -> { + publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> { + publishListener.onResponse(null); ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100))); ackListener.onNodeAck(node1, null); ackListener.onNodeAck(node2, null); diff --git a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index 3458cca0cf7..d8390a6c688 100644 --- a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -155,7 +156,7 @@ public class SnapshotDisruptionIT extends AbstractDisruptionTestCase { Throwable cause = ex.getCause(); assertThat(cause, instanceOf(MasterNotDiscoveredException.class)); cause = cause.getCause(); - assertThat(cause, instanceOf(Discovery.FailedToCommitClusterStateException.class)); + assertThat(cause, instanceOf(FailedToCommitClusterStateException.class)); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 57bde7f70cc..5e7d3b3d9b5 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -510,7 +511,7 @@ public class PublishClusterStateActionTests extends ESTestCase { try { publishStateAndWait(nodeA.action, unserializableClusterState, previousClusterState); fail("cluster state published despite of diff errors"); - } catch (Discovery.FailedToCommitClusterStateException e) { + } catch (FailedToCommitClusterStateException e) { assertThat(e.getCause(), notNullValue()); assertThat(e.getCause().getMessage(), containsString("failed to serialize")); } @@ -538,7 +539,7 @@ public class PublishClusterStateActionTests extends ESTestCase { try { publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5)); fail("cluster state publishing didn't fail despite of not having enough nodes"); - } catch (Discovery.FailedToCommitClusterStateException expected) { + } catch (FailedToCommitClusterStateException expected) { logger.debug("failed to publish as expected", expected); } } @@ -618,7 +619,7 @@ public class PublishClusterStateActionTests extends ESTestCase { if (expectingToCommit == false) { fail("cluster state publishing didn't fail despite of not have enough nodes"); } - } catch (Discovery.FailedToCommitClusterStateException exception) { + } catch (FailedToCommitClusterStateException exception) { logger.debug("failed to publish as expected", exception); if (expectingToCommit) { throw exception; @@ -696,7 +697,7 @@ public class PublishClusterStateActionTests extends ESTestCase { try { publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS); success = true; - } catch (Discovery.FailedToCommitClusterStateException OK) { + } catch (FailedToCommitClusterStateException OK) { success = false; } logger.debug("--> publishing [{}], verifying...", success ? "succeeded" : "failed"); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 6dbf80d9be6..abcdf9ae402 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -19,8 +19,8 @@ package org.elasticsearch.discovery.zen; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterModule; @@ -43,7 +43,8 @@ import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ClusterServiceUtils; @@ -56,6 +57,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; +import org.hamcrest.core.IsInstanceOf; import java.io.Closeable; import java.io.IOException; @@ -68,6 +70,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -83,11 +86,11 @@ import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAl import static org.elasticsearch.cluster.service.MasterServiceTests.discoveryState; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; -import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.is; public class ZenDiscoveryUnitTests extends ESTestCase { @@ -225,16 +228,19 @@ public class ZenDiscoveryUnitTests extends ESTestCase { DiscoveryNodes.builder(state.nodes()).add(otherNode).masterNodeId(masterNode.getId()) ).build(); - try { - // publishing a new cluster state - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); - AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); - expectedFDNodes = masterZen.getFaultDetectionNodes(); - masterZen.publish(clusterChangedEvent, listener); + // publishing a new cluster state + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); + AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); + expectedFDNodes = masterZen.getFaultDetectionNodes(); + AwaitingPublishListener awaitingPublishListener = new AwaitingPublishListener(); + masterZen.publish(clusterChangedEvent, awaitingPublishListener, listener); + awaitingPublishListener.await(); + if (awaitingPublishListener.getException() == null) { + // publication succeeded, wait for acks listener.await(10, TimeUnit.SECONDS); // publish was a success, update expected FD nodes based on new cluster state expectedFDNodes = fdNodesForState(newState, masterNode); - } catch (Discovery.FailedToCommitClusterStateException e) { + } else { // not successful, so expectedFDNodes above should remain what it was originally assigned assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail } @@ -278,25 +284,50 @@ public class ZenDiscoveryUnitTests extends ESTestCase { DiscoveryNodes.builder(discoveryState(masterMasterService).nodes()).masterNodeId(masterNode.getId()) ).build(); - - try { - // publishing a new cluster state - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); - AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); - masterZen.publish(clusterChangedEvent, listener); + // publishing a new cluster state + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); + AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); + AwaitingPublishListener awaitingPublishListener = new AwaitingPublishListener(); + masterZen.publish(clusterChangedEvent, awaitingPublishListener, listener); + awaitingPublishListener.await(); + if (awaitingPublishListener.getException() == null) { + // publication succeeded, wait for acks listener.await(1, TimeUnit.HOURS); - // publish was a success, check that queue as cleared - assertThat(masterZen.pendingClusterStates(), emptyArray()); - } catch (Discovery.FailedToCommitClusterStateException e) { - // not successful, so the pending queue should be cleaned - assertThat(Arrays.toString(masterZen.pendingClusterStates()), masterZen.pendingClusterStates(), arrayWithSize(0)); } + // queue should be cleared whether successful or not + assertThat(Arrays.toString(masterZen.pendingClusterStates()), masterZen.pendingClusterStates(), emptyArray()); } finally { IOUtils.close(toClose); terminate(threadPool); } } + private class AwaitingPublishListener implements ActionListener { + private final CountDownLatch countDownLatch = new CountDownLatch(1); + private FailedToCommitClusterStateException exception; + + @Override + public synchronized void onResponse(Void aVoid) { + assertThat(countDownLatch.getCount(), is(1L)); + countDownLatch.countDown(); + } + + @Override + public synchronized void onFailure(Exception e) { + assertThat(e, IsInstanceOf.instanceOf(FailedToCommitClusterStateException.class)); + exception = (FailedToCommitClusterStateException) e; + onResponse(null); + } + + public void await() throws InterruptedException { + countDownLatch.await(); + } + + public synchronized FailedToCommitClusterStateException getException() { + return exception; + } + } + private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, MasterService masterService, ThreadPool threadPool) { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java new file mode 100644 index 00000000000..519704349e1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java @@ -0,0 +1,149 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.apache.lucene.util.LuceneTestCase.random; +import static org.elasticsearch.test.ESTestCase.randomInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class FakeThreadPoolMasterService extends MasterService { + + private final String name; + private final List pendingTasks = new ArrayList<>(); + private final Consumer onTaskAvailableToRun; + private boolean scheduledNextTask = false; + private boolean taskInProgress = false; + private boolean waitForPublish = false; + + FakeThreadPoolMasterService(String serviceName, Consumer onTaskAvailableToRun) { + super(Settings.EMPTY, createMockThreadPool()); + this.name = serviceName; + this.onTaskAvailableToRun = onTaskAvailableToRun; + } + + private static ThreadPool createMockThreadPool() { + final ThreadContext context = new ThreadContext(Settings.EMPTY); + final ThreadPool mockThreadPool = mock(ThreadPool.class); + when(mockThreadPool.getThreadContext()).thenReturn(context); + return mockThreadPool; + } + + @Override + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name), + null, null) { + + @Override + public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) { + execute(command); + } + + @Override + public void execute(Runnable command) { + pendingTasks.add(command); + scheduleNextTaskIfNecessary(); + } + }; + } + + private void scheduleNextTaskIfNecessary() { + if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) { + scheduledNextTask = true; + onTaskAvailableToRun.accept(() -> { + assert taskInProgress == false; + assert waitForPublish == false; + assert scheduledNextTask; + final int taskIndex = randomInt(pendingTasks.size() - 1); + logger.debug("next master service task: choosing task {} of {}", taskIndex, pendingTasks.size()); + final Runnable task = pendingTasks.remove(taskIndex); + taskInProgress = true; + scheduledNextTask = false; + task.run(); + if (waitForPublish == false) { + taskInProgress = false; + } + scheduleNextTaskIfNecessary(); + }); + } + } + + @Override + public ClusterState.Builder incrementVersion(ClusterState clusterState) { + // generate cluster UUID deterministically for repeatable tests + return ClusterState.builder(clusterState).incrementVersion().stateUUID(UUIDs.randomBase64UUID(random())); + } + + @Override + protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) { + assert waitForPublish == false; + waitForPublish = true; + final Discovery.AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()); + clusterStatePublisher.publish(clusterChangedEvent, new ActionListener() { + + private boolean listenerCalled = false; + + @Override + public void onResponse(Void aVoid) { + assert listenerCalled == false; + listenerCalled = true; + assert waitForPublish; + waitForPublish = false; + try { + onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS); + } finally { + taskInProgress = false; + scheduleNextTaskIfNecessary(); + } + } + + @Override + public void onFailure(Exception e) { + assert listenerCalled == false; + listenerCalled = true; + assert waitForPublish; + waitForPublish = false; + try { + onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e); + } finally { + taskInProgress = false; + scheduleNextTaskIfNecessary(); + } + } + }, ackListener); + } +} diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterServiceTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterServiceTests.java new file mode 100644 index 00000000000..9d97ae2d652 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterServiceTests.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; + +public class FakeThreadPoolMasterServiceTests extends ESTestCase { + + public void testFakeMasterService() { + List runnableTasks = new ArrayList<>(); + AtomicReference lastClusterStateRef = new AtomicReference<>(); + DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), + new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.CURRENT); + lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode)); + long firstClusterStateVersion = lastClusterStateRef.get().version(); + AtomicReference> publishingCallback = new AtomicReference<>(); + FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test", runnableTasks::add); + masterService.setClusterStateSupplier(lastClusterStateRef::get); + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + lastClusterStateRef.set(event.state()); + publishingCallback.set(publishListener); + }); + masterService.start(); + + AtomicBoolean firstTaskCompleted = new AtomicBoolean(); + masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.metaData()).put(indexBuilder("test1"))).build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + assertFalse(firstTaskCompleted.get()); + firstTaskCompleted.set(true); + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(); + } + }); + assertThat(runnableTasks.size(), equalTo(1)); + assertThat(lastClusterStateRef.get().metaData().indices().size(), equalTo(0)); + assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion)); + assertNull(publishingCallback.get()); + assertFalse(firstTaskCompleted.get()); + + runnableTasks.remove(0).run(); + assertThat(lastClusterStateRef.get().metaData().indices().size(), equalTo(1)); + assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 1)); + assertNotNull(publishingCallback.get()); + assertFalse(firstTaskCompleted.get()); + assertThat(runnableTasks.size(), equalTo(0)); + + AtomicBoolean secondTaskCompleted = new AtomicBoolean(); + masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.metaData()).put(indexBuilder("test2"))).build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + assertFalse(secondTaskCompleted.get()); + secondTaskCompleted.set(true); + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(); + } + }); + assertThat(runnableTasks.size(), equalTo(0)); + + publishingCallback.getAndSet(null).onResponse(null); + assertTrue(firstTaskCompleted.get()); + assertThat(runnableTasks.size(), equalTo(1)); // check that new task gets queued + + runnableTasks.remove(0).run(); + assertThat(lastClusterStateRef.get().metaData().indices().size(), equalTo(2)); + assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 2)); + assertNotNull(publishingCallback.get()); + assertFalse(secondTaskCompleted.get()); + publishingCallback.getAndSet(null).onResponse(null); + assertTrue(secondTaskCompleted.get()); + assertThat(runnableTasks.size(), equalTo(0)); // check that no more tasks are queued + } + + private static IndexMetaData.Builder indexBuilder(String index) { + return IndexMetaData.builder(index).settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + } +} diff --git a/server/src/test/java/org/elasticsearch/test/NoopDiscovery.java b/server/src/test/java/org/elasticsearch/test/NoopDiscovery.java index bcb9efd5b5e..cb3e7320def 100644 --- a/server/src/test/java/org/elasticsearch/test/NoopDiscovery.java +++ b/server/src/test/java/org/elasticsearch/test/NoopDiscovery.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.test; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; @@ -27,8 +28,8 @@ import org.elasticsearch.discovery.DiscoveryStats; public class NoopDiscovery implements Discovery { @Override - public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) { - + public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener publishListener, AckListener ackListener) { + publishListener.onResponse(null); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 8c4076e327d..f20b0f915ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -21,7 +21,6 @@ package org.elasticsearch.test; import org.apache.logging.log4j.core.util.Throwables; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -36,14 +35,13 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.Discovery.AckListener; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; import java.util.EnumSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import static junit.framework.TestCase.fail; @@ -52,7 +50,10 @@ public class ClusterServiceUtils { public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { MasterService masterService = new MasterService(Settings.EMPTY, threadPool); AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - masterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state())); + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + clusterStateRef.set(event.state()); + publishListener.onResponse(null); + }); masterService.setClusterStateSupplier(clusterStateRef::get); masterService.start(); return masterService; @@ -158,33 +159,21 @@ public class ClusterServiceUtils { return clusterService; } - public static BiConsumer createClusterStatePublisher(ClusterApplier clusterApplier) { - return (event, ackListener) -> { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference ex = new AtomicReference<>(); + public static ClusterStatePublisher createClusterStatePublisher(ClusterApplier clusterApplier) { + return (event, publishListener, ackListener) -> clusterApplier.onNewClusterState("mock_publish_to_self[" + event.source() + "]", () -> event.state(), new ClusterApplyListener() { @Override public void onSuccess(String source) { - latch.countDown(); + publishListener.onResponse(null); } @Override public void onFailure(String source, Exception e) { - ex.set(e); - latch.countDown(); + publishListener.onFailure(e); } } - ); - try { - latch.await(); - } catch (InterruptedException e) { - Throwables.rethrow(e); - } - if (ex.get() != null) { - Throwables.rethrow(ex.get()); - } - }; + ); } public static ClusterService createClusterService(ClusterState initialState, ThreadPool threadPool) {