Zen2: Deterministic MasterService (#32493)

Increases testability of MasterService and the discovery layer. Changes:
- Async publish method
- Moved a few interfaces/classes top-level to simplify imports
- Deterministic MasterService implementation for tests
This commit is contained in:
Yannick Welsch 2018-08-13 18:03:08 +02:00 committed by GitHub
parent 433b7b8427
commit e122505a91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 636 additions and 243 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
@ -1006,8 +1005,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
UNKNOWN_VERSION_ADDED),
TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class,
org.elasticsearch.indices.TypeMissingException::new, 137, UNKNOWN_VERSION_ADDED),
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class,
org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED),
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException.class,
org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED),
QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class,
org.elasticsearch.index.query.QueryShardException::new, 141, UNKNOWN_VERSION_ADDED),
NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class,
@ -1026,8 +1025,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.common.xcontent.UnknownNamedObjectException::new, 148, Version.V_5_2_0),
TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1),
COORDINATION_STATE_REJECTED_EXCEPTION(CoordinationStateRejectedException.class,
CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1);
COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class,
org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1);
final Class<? extends ElasticsearchException> exceptionClass;

View File

@ -38,7 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
@ -174,7 +174,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
if (t instanceof FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);

View File

@ -49,7 +49,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
@ -132,7 +132,7 @@ public class ShardStateAction extends AbstractComponent {
private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{
NotMasterException.class,
ConnectTransportException.class,
Discovery.FailedToCommitClusterStateException.class
FailedToCommitClusterStateException.class
};
private static boolean isMasterChannelException(TransportException exp) {
@ -625,7 +625,7 @@ public class ShardStateAction extends AbstractComponent {
* are:
* - {@link NotMasterException}
* - {@link NodeDisconnectedException}
* - {@link Discovery.FailedToCommitClusterStateException}
* - {@link FailedToCommitClusterStateException}
*
* Any other exception is communicated to the requester via
* this notification.

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
public interface ClusterStatePublisher {
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
* process should apply this state to the master as well!
*
* The publishListener allows to wait for the publication to complete, which can be either successful completion, timing out or failing.
* The method is guaranteed to pass back a {@link FailedToCommitClusterStateException} to the publishListener if the change is not
* committed and should be rejected. Any other exception signals that something bad happened but the change is committed.
*
* The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether
* they updated their own cluster state or not.
*/
void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener);
interface AckListener {
/**
* Should be called when the cluster coordination layer has committed the cluster state (i.e. even if this publication fails,
* it is guaranteed to appear in future publications).
* @param commitTime the time it took to commit the cluster state
*/
void onCommit(TimeValue commitTime);
/**
* Should be called whenever the cluster coordination layer receives confirmation from a node that it has successfully applied
* the cluster state. In case of failures, an exception should be provided as parameter.
* @param node the node
* @param e the optional exception
*/
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/**
* Thrown when failing to publish a cluster state. See {@link ClusterStatePublisher} for more details.
*/
public class FailedToCommitClusterStateException extends ElasticsearchException {
public FailedToCommitClusterStateException(StreamInput in) throws IOException {
super(in);
}
public FailedToCommitClusterStateException(String msg, Object... args) {
super(msg, args);
}
public FailedToCommitClusterStateException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -22,12 +22,11 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.Discovery.AckListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
@ -145,7 +144,7 @@ public abstract class Publication extends AbstractComponent {
if (isPublishQuorum(possiblySuccessfulNodes) == false) {
logger.debug("onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed",
possiblySuccessfulNodes, this);
Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum");
Exception e = new FailedToCommitClusterStateException("non-failed nodes do not form a quorum");
publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e));
onPossibleCompletion();
}

View File

@ -59,25 +59,31 @@ public class ClusterService extends AbstractLifecycleComponent {
private final OperationRouting operationRouting;
private final ClusterSettings clusterSettings;
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
this(settings, clusterSettings, new MasterService(settings, threadPool),
new ClusterApplierService(settings, clusterSettings, threadPool,
() -> ClusterService.newClusterStateBuilder(settings, initialClusterStateCustoms)));
}
public ClusterService(Settings settings, ClusterSettings clusterSettings,
MasterService masterService, ClusterApplierService clusterApplierService) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.masterService = masterService;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
this.initialClusterStateCustoms = initialClusterStateCustoms;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
this.clusterApplierService = clusterApplierService;
}
/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
public ClusterState.Builder newClusterStateBuilder() {
private static ClusterState.Builder newClusterStateBuilder(Settings settings,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
builder.putCustom(entry.getKey(), entry.getValue().get());

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -48,7 +49,9 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
@ -59,7 +62,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -70,7 +72,7 @@ public class MasterService extends AbstractLifecycleComponent {
public static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask";
private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;
protected ClusterStatePublisher clusterStatePublisher;
private java.util.function.Supplier<ClusterState> clusterStateSupplier;
@ -92,7 +94,7 @@ public class MasterService extends AbstractLifecycleComponent {
this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
}
public synchronized void setClusterStatePublisher(BiConsumer<ClusterChangedEvent, Discovery.AckListener> publisher) {
public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) {
clusterStatePublisher = publisher;
}
@ -104,12 +106,16 @@ public class MasterService extends AbstractLifecycleComponent {
protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
threadPoolExecutor = createThreadPoolExecutor();
taskBatcher = new Batcher(logger, threadPoolExecutor);
}
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return EsExecutors.newSinglePrioritizing(
nodeName() + "/" + MASTER_UPDATE_THREAD_NAME,
daemonThreadFactory(settings, MASTER_UPDATE_THREAD_NAME),
threadPool.getThreadContext(),
threadPool.scheduler());
taskBatcher = new Batcher(logger, threadPoolExecutor);
}
class Batcher extends TaskBatcher {
@ -166,14 +172,17 @@ public class MasterService extends AbstractLifecycleComponent {
return clusterStateSupplier.get();
}
private static boolean isMasterUpdateThread() {
return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME);
}
public static boolean assertMasterUpdateThread() {
assert Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) :
"not called from the master service thread";
assert isMasterUpdateThread() : "not called from the master service thread";
return true;
}
public static boolean assertNotMasterUpdateThread(String reason) {
assert Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) == false :
assert isMasterUpdateThread() == false :
"Expected current thread [" + Thread.currentThread() + "] to not be the master service thread. Reason: [" + reason + "]";
return true;
}
@ -222,31 +231,62 @@ public class MasterService extends AbstractLifecycleComponent {
}
logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
clusterStatePublisher.accept(clusterChangedEvent, taskOutputs.createAckListener(threadPool, newClusterState));
} catch (Discovery.FailedToCommitClusterStateException t) {
final long version = newClusterState.version();
logger.warn(() -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", summary, version), t);
taskOutputs.publishingFailed(t);
return;
publish(clusterChangedEvent, taskOutputs, startTimeNS);
} catch (Exception e) {
handleException(summary, startTimeNS, newClusterState, e);
}
}
}
taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {
@Override
protected boolean blockingAllowed() {
return isMasterUpdateThread() || super.blockingAllowed();
}
};
clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()));
// indefinitely wait for publication to complete
try {
FutureUtils.get(fut);
onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS);
} catch (Exception e) {
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e);
}
}
protected void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());
try {
taskOutputs.clusterStatePublished(clusterChangedEvent);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage(
"exception thrown while notifying executor of new cluster state publication [{}]",
summary), e);
clusterChangedEvent.source()), e);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})", summary,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, summary);
} catch (Exception e) {
logger.debug("processing [{}]: took [{}] done publishing updated cluster state (version: {}, uuid: {})",
clusterChangedEvent.source(),
executionTime, clusterChangedEvent.state().version(),
clusterChangedEvent.state().stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, clusterChangedEvent.source());
}
protected void onPublicationFailed(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS,
Exception exception) {
if (exception instanceof FailedToCommitClusterStateException) {
final long version = clusterChangedEvent.state().version();
logger.warn(() -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", clusterChangedEvent.source(), version), exception);
taskOutputs.publishingFailed((FailedToCommitClusterStateException) exception);
} else {
handleException(clusterChangedEvent.source(), startTimeNS, clusterChangedEvent.state(), exception);
}
}
private void handleException(String summary, long startTimeNS, ClusterState newClusterState, Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
@ -261,8 +301,6 @@ public class MasterService extends AbstractLifecycleComponent {
e);
// TODO: do we want to call updateTask.onFailure here?
}
}
}
public TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, long startTimeNS) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, startTimeNS, previousClusterState);
@ -276,7 +314,7 @@ public class MasterService extends AbstractLifecycleComponent {
if (previousClusterState != newClusterState) {
// only the master controls the version numbers
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
Builder builder = incrementVersion(newClusterState);
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable())
.version(newClusterState.routingTable().version() + 1).build());
@ -291,6 +329,10 @@ public class MasterService extends AbstractLifecycleComponent {
return newClusterState;
}
protected Builder incrementVersion(ClusterState clusterState) {
return ClusterState.builder(clusterState).incrementVersion();
}
/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched.
@ -335,7 +377,7 @@ public class MasterService extends AbstractLifecycleComponent {
/**
* Output created by executing a set of tasks provided as TaskInputs
*/
class TaskOutputs {
protected class TaskOutputs {
public final TaskInputs taskInputs;
public final ClusterState previousClusterState;
public final ClusterState newClusterState;
@ -353,7 +395,7 @@ public class MasterService extends AbstractLifecycleComponent {
this.executionResults = executionResults;
}
public void publishingFailed(Discovery.FailedToCommitClusterStateException t) {
public void publishingFailed(FailedToCommitClusterStateException t) {
nonFailedTasks.forEach(task -> task.listener.onFailure(task.source(), t));
}

View File

@ -61,11 +61,7 @@ public abstract class BaseFuture<V> implements Future<V> {
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
assert timeout <= 0 ||
(Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&
ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) &&
MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON));
assert timeout <= 0 || blockingAllowed();
return sync.get(unit.toNanos(timeout));
}
@ -87,11 +83,16 @@ public abstract class BaseFuture<V> implements Future<V> {
*/
@Override
public V get() throws InterruptedException, ExecutionException {
assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
assert blockingAllowed();
return sync.get();
}
// protected so that it can be overridden in specific instances
protected boolean blockingAllowed() {
return Transports.assertNotTransportThread(BLOCKING_OP_REASON) &&
ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) &&
ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) &&
MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON);
return sync.get();
}
@Override

View File

@ -48,7 +48,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
private final ScheduledExecutorService timer;
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
public PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
this.timer = timer;

View File

@ -19,66 +19,15 @@
package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
/**
* A pluggable module allowing to implement discovery of other nodes, publishing of the cluster
* state to all nodes, electing a master of the cluster that raises cluster state change
* events.
*/
public interface Discovery extends LifecycleComponent {
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
* process should apply this state to the master as well!
*
* The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether
* they updated their own cluster state or not.
*
* The method is guaranteed to throw a {@link FailedToCommitClusterStateException} if the change is not committed and should be rejected.
* Any other exception signals the something wrong happened but the change is committed.
*/
void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
interface AckListener {
/**
* Should be called when the discovery layer has committed the clusters state (i.e. even if this publication fails,
* it is guaranteed to appear in future publications).
* @param commitTime the time it took to commit the cluster state
*/
void onCommit(TimeValue commitTime);
/**
* Should be called whenever the discovery layer receives confirmation from a node that it has successfully applied
* the cluster state. In case of failures, an exception should be provided as parameter.
* @param node the node
* @param e the optional exception
*/
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
}
class FailedToCommitClusterStateException extends ElasticsearchException {
public FailedToCommitClusterStateException(StreamInput in) throws IOException {
super(in);
}
public FailedToCommitClusterStateException(String msg, Object... args) {
super(msg, args);
}
public FailedToCommitClusterStateException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
public interface Discovery extends LifecycleComponent, ClusterStatePublisher {
/**
* @return stats about the discovery

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.single;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -35,9 +36,7 @@ import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@ -59,33 +58,25 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
}
@Override
public synchronized void publish(final ClusterChangedEvent event,
public synchronized void publish(final ClusterChangedEvent event, ActionListener<Void> publishListener,
final AckListener ackListener) {
clusterState = event.state();
ackListener.onCommit(TimeValue.ZERO);
CountDownLatch latch = new CountDownLatch(1);
ClusterApplyListener listener = new ClusterApplyListener() {
clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", () -> clusterState, new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
latch.countDown();
publishListener.onResponse(null);
ackListener.onNodeAck(transportService.getLocalNode(), null);
}
@Override
public void onFailure(String source, Exception e) {
latch.countDown();
publishListener.onFailure(e);
ackListener.onNodeAck(transportService.getLocalNode(), e);
logger.warn(() -> new ParameterizedMessage("failed while applying cluster state locally [{}]", event.source()), e);
}
};
clusterApplier.onNewClusterState("apply-locally-on-node[" + event.source() + "]", () -> clusterState, listener);
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@Override
@ -129,7 +120,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
}
@Override
protected void doClose() throws IOException {
protected void doClose() {
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
@ -117,12 +118,12 @@ public class PublishClusterStateAction extends AbstractComponent {
* publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will
* be processed by the master and the other nodes.
* <p>
* The method is guaranteed to throw a {@link org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException}
* The method is guaranteed to throw a {@link FailedToCommitClusterStateException}
* if the change is not committed and should be rejected.
* Any other exception signals the something wrong happened but the change is committed.
*/
public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes,
final Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException {
final Discovery.AckListener ackListener) throws FailedToCommitClusterStateException {
final DiscoveryNodes nodes;
final SendingController sendingController;
final Set<DiscoveryNode> nodesToPublishTo;
@ -155,19 +156,19 @@ public class PublishClusterStateAction extends AbstractComponent {
sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes,
totalMasterNodes, publishResponseHandler);
} catch (Exception e) {
throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", e);
throw new FailedToCommitClusterStateException("unexpected error while preparing to publish", e);
}
try {
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
serializedDiffs);
} catch (Discovery.FailedToCommitClusterStateException t) {
} catch (FailedToCommitClusterStateException t) {
throw t;
} catch (Exception e) {
// try to fail committing, in cause it's still on going
if (sendingController.markAsFailed("unexpected error", e)) {
// signal the change should be rejected
throw new Discovery.FailedToCommitClusterStateException("unexpected error", e);
throw new FailedToCommitClusterStateException("unexpected error", e);
} else {
throw e;
}
@ -518,7 +519,7 @@ public class PublishClusterStateAction extends AbstractComponent {
this.neededMastersToCommit = Math.max(0, minMasterNodes - 1); // we are one of the master nodes
this.pendingMasterNodes = totalMasterNodes - 1;
if (this.neededMastersToCommit > this.pendingMasterNodes) {
throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state." +
throw new FailedToCommitClusterStateException("not enough masters to ack sent cluster state." +
"[{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes);
}
this.committed = neededMastersToCommit == 0;
@ -537,7 +538,7 @@ public class PublishClusterStateAction extends AbstractComponent {
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])");
}
if (isCommitted() == false) {
throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left",
throw new FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left",
timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit);
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.discovery.zen;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
@ -52,9 +51,11 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -74,7 +75,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -322,10 +322,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
}
@Override
public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {
ClusterState newState = clusterChangedEvent.state();
assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();
try {
// state got changed locally (maybe because another master published to us)
if (clusterChangedEvent.previousState() != this.committedState.get()) {
throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
@ -333,7 +335,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
pendingStatesQueue.addPending(newState);
try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
// cluster service logs a WARN message
@ -346,25 +347,26 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
rejoin("zen-disco-failed-to-publish");
}
throw t;
publishListener.onFailure(t);
return;
}
final DiscoveryNode localNode = newState.getNodes().getLocalNode();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean processedOrFailed = new AtomicBoolean();
pendingStatesQueue.markAsCommitted(newState.stateUUID(),
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
processedOrFailed.set(true);
latch.countDown();
publishListener.onResponse(null);
ackListener.onNodeAck(localNode, null);
}
@Override
public void onNewClusterStateFailed(Exception e) {
processedOrFailed.set(true);
latch.countDown();
publishListener.onFailure(e);
ackListener.onNodeAck(localNode, e);
logger.warn(() -> new ParameterizedMessage(
"failed while applying cluster state locally [{}]", clusterChangedEvent.source()), e);
@ -373,7 +375,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
synchronized (stateMutex) {
if (clusterChangedEvent.previousState() != this.committedState.get()) {
throw new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes");
publishListener.onFailure(new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes"));
return;
}
boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +
@ -382,17 +385,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
assert false : "cluster state published locally neither processed nor failed: " + newState;
logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",
newState.version());
return;
publishListener.onFailure(new FailedToCommitClusterStateException("cluster state that is published locally has neither been processed nor failed"));
}
}
// indefinitely wait for cluster state to be applied locally
try {
latch.await();
} catch (InterruptedException e) {
logger.debug(() -> new ParameterizedMessage(
"interrupted while applying cluster state locally [{}]", clusterChangedEvent.source()), e);
Thread.currentThread().interrupt();
}
}
/**

View File

@ -147,7 +147,7 @@ public class Gateway extends AbstractComponent {
metaDataBuilder.transientSettings(),
e -> logUnknownSetting("transient", e),
(e, ex) -> logInvalidSetting("transient", e, ex)));
ClusterState.Builder builder = clusterService.newClusterStateBuilder();
ClusterState.Builder builder = clusterService.getClusterApplierService().newClusterStateBuilder();
builder.metaData(metaDataBuilder);
listener.onSuccess(builder.build());
}

View File

@ -801,7 +801,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(137, org.elasticsearch.indices.TypeMissingException.class);
ids.put(138, null);
ids.put(139, null);
ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class);
ids.put(140, org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException.class);
ids.put(141, org.elasticsearch.index.query.QueryShardException.class);
ids.put(142, ShardStateAction.NoLongerPrimaryShardException.class);
ids.put(143, org.elasticsearch.script.ScriptException.class);

View File

@ -40,7 +40,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
@ -419,7 +419,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
// The other node has become master, simulate failures of this node while publishing cluster state through ZenDiscovery
setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
Exception failure = randomBoolean()
? new Discovery.FailedToCommitClusterStateException("Fake error")
? new FailedToCommitClusterStateException("Fake error")
: new NotMasterException("Fake error");
listener.onFailure(failure);
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
@ -371,8 +372,11 @@ public class TransportReplicationActionTests extends ESTestCase {
public void testClosedIndexOnReroute() throws InterruptedException {
final String index = "test";
// no replicas in oder to skip the replication part
setState(clusterService, new ClusterStateChanges(xContentRegistry(), threadPool).closeIndices(state(index, true,
ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index)));
ClusterStateChanges clusterStateChanges = new ClusterStateChanges(xContentRegistry(), threadPool);
setState(clusterService, clusterStateChanges.closeIndices(
clusterStateChanges.createIndex(clusterService.state(), new CreateIndexRequest(index)),
new CloseIndexRequest(index)));
assertThat(clusterService.state().metaData().indices().get(index).getState(), equalTo(IndexMetaData.State.CLOSE));
logger.debug("--> using initial state:\n{}", clusterService.state());
Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms");
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();

View File

@ -25,8 +25,8 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.index.query.QueryBuilders;
@ -401,7 +401,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
logger.debug("--> waiting for cluster state to be processed/rejected");
latch.await();
assertThat(failure.get(), instanceOf(Discovery.FailedToCommitClusterStateException.class));
assertThat(failure.get(), instanceOf(FailedToCommitClusterStateException.class));
assertBusy(() -> assertThat(masterClusterService.state().nodes().getMasterNode(), nullValue()));
partition.stopDisrupting();

View File

@ -40,7 +40,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
@ -243,7 +243,7 @@ public class ShardStateActionTests extends ESTestCase {
if (randomBoolean()) {
transport.handleRemoteError(
requestId,
randomFrom(new NotMasterException("simulated"), new Discovery.FailedToCommitClusterStateException("simulated")));
randomFrom(new NotMasterException("simulated"), new FailedToCommitClusterStateException("simulated")));
} else {
if (randomBoolean()) {
transport.handleLocalError(requestId, new NodeNotConnectedException(null, "simulated"));

View File

@ -62,7 +62,7 @@ public class ClusterSerivceTests extends ESTestCase {
ClusterService service = new ClusterService(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.singletonMap("foo", () ->
custom));
ClusterState.Builder builder = service.newClusterStateBuilder();
ClusterState.Builder builder = service.getClusterApplierService().newClusterStateBuilder();
assertSame(builder.build().custom("foo"), custom);
}
}

View File

@ -42,7 +42,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -71,7 +72,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -122,7 +122,10 @@ public class MasterServiceTests extends ESTestCase {
.masterNodeId(makeMaster ? localNode.getId() : null))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
timedMasterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state()));
timedMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
clusterStateRef.set(event.state());
publishListener.onResponse(null);
});
timedMasterService.setClusterStateSupplier(clusterStateRef::get);
timedMasterService.start();
return timedMasterService;
@ -789,18 +792,17 @@ public class MasterServiceTests extends ESTestCase {
.localNodeId(node1.getId())
.masterNodeId(node1.getId()))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build();
final AtomicReference<BiConsumer<ClusterChangedEvent, Discovery.AckListener>> publisherRef = new AtomicReference<>();
timedMasterService.setClusterStatePublisher((cce, l) -> publisherRef.get().accept(cce, l));
final AtomicReference<ClusterStatePublisher> publisherRef = new AtomicReference<>();
timedMasterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al));
timedMasterService.setClusterStateSupplier(() -> initialClusterState);
timedMasterService.start();
// check that we don't time out before even committing the cluster state
{
final CountDownLatch latch = new CountDownLatch(1);
publisherRef.set((clusterChangedEvent, ackListener) -> {
throw new Discovery.FailedToCommitClusterStateException("mock exception");
publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
publishListener.onFailure(new FailedToCommitClusterStateException("mock exception"));
});
timedMasterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask<Void>(null, null) {
@ -850,7 +852,8 @@ public class MasterServiceTests extends ESTestCase {
final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));
publisherRef.set((clusterChangedEvent, ackListener) -> {
publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
publishListener.onResponse(null);
ackListener.onCommit(TimeValue.timeValueMillis(ackTimeout.millis() + randomInt(100)));
ackListener.onNodeAck(node1, null);
ackListener.onNodeAck(node2, null);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -155,7 +156,7 @@ public class SnapshotDisruptionIT extends AbstractDisruptionTestCase {
Throwable cause = ex.getCause();
assertThat(cause, instanceOf(MasterNotDiscoveredException.class));
cause = cause.getCause();
assertThat(cause, instanceOf(Discovery.FailedToCommitClusterStateException.class));
assertThat(cause, instanceOf(FailedToCommitClusterStateException.class));
}
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -510,7 +511,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
try {
publishStateAndWait(nodeA.action, unserializableClusterState, previousClusterState);
fail("cluster state published despite of diff errors");
} catch (Discovery.FailedToCommitClusterStateException e) {
} catch (FailedToCommitClusterStateException e) {
assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("failed to serialize"));
}
@ -538,7 +539,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
try {
publishState(master.action, clusterState, previousState, masterNodes + randomIntBetween(1, 5));
fail("cluster state publishing didn't fail despite of not having enough nodes");
} catch (Discovery.FailedToCommitClusterStateException expected) {
} catch (FailedToCommitClusterStateException expected) {
logger.debug("failed to publish as expected", expected);
}
}
@ -618,7 +619,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
if (expectingToCommit == false) {
fail("cluster state publishing didn't fail despite of not have enough nodes");
}
} catch (Discovery.FailedToCommitClusterStateException exception) {
} catch (FailedToCommitClusterStateException exception) {
logger.debug("failed to publish as expected", exception);
if (expectingToCommit) {
throw exception;
@ -696,7 +697,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
try {
publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS);
success = true;
} catch (Discovery.FailedToCommitClusterStateException OK) {
} catch (FailedToCommitClusterStateException OK) {
success = false;
}
logger.debug("--> publishing [{}], verifying...", success ? "succeeded" : "failed");

View File

@ -19,8 +19,8 @@
package org.elasticsearch.discovery.zen;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule;
@ -43,7 +43,8 @@ import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ClusterServiceUtils;
@ -56,6 +57,7 @@ import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.core.IsInstanceOf;
import java.io.Closeable;
import java.io.IOException;
@ -68,6 +70,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@ -83,11 +86,11 @@ import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAl
import static org.elasticsearch.cluster.service.MasterServiceTests.discoveryState;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.is;
public class ZenDiscoveryUnitTests extends ESTestCase {
@ -225,16 +228,19 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
DiscoveryNodes.builder(state.nodes()).add(otherNode).masterNodeId(masterNode.getId())
).build();
try {
// publishing a new cluster state
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state);
AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1);
expectedFDNodes = masterZen.getFaultDetectionNodes();
masterZen.publish(clusterChangedEvent, listener);
AwaitingPublishListener awaitingPublishListener = new AwaitingPublishListener();
masterZen.publish(clusterChangedEvent, awaitingPublishListener, listener);
awaitingPublishListener.await();
if (awaitingPublishListener.getException() == null) {
// publication succeeded, wait for acks
listener.await(10, TimeUnit.SECONDS);
// publish was a success, update expected FD nodes based on new cluster state
expectedFDNodes = fdNodesForState(newState, masterNode);
} catch (Discovery.FailedToCommitClusterStateException e) {
} else {
// not successful, so expectedFDNodes above should remain what it was originally assigned
assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail
}
@ -278,25 +284,50 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
DiscoveryNodes.builder(discoveryState(masterMasterService).nodes()).masterNodeId(masterNode.getId())
).build();
try {
// publishing a new cluster state
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state);
AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1);
masterZen.publish(clusterChangedEvent, listener);
AwaitingPublishListener awaitingPublishListener = new AwaitingPublishListener();
masterZen.publish(clusterChangedEvent, awaitingPublishListener, listener);
awaitingPublishListener.await();
if (awaitingPublishListener.getException() == null) {
// publication succeeded, wait for acks
listener.await(1, TimeUnit.HOURS);
// publish was a success, check that queue as cleared
assertThat(masterZen.pendingClusterStates(), emptyArray());
} catch (Discovery.FailedToCommitClusterStateException e) {
// not successful, so the pending queue should be cleaned
assertThat(Arrays.toString(masterZen.pendingClusterStates()), masterZen.pendingClusterStates(), arrayWithSize(0));
}
// queue should be cleared whether successful or not
assertThat(Arrays.toString(masterZen.pendingClusterStates()), masterZen.pendingClusterStates(), emptyArray());
} finally {
IOUtils.close(toClose);
terminate(threadPool);
}
}
private class AwaitingPublishListener implements ActionListener<Void> {
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private FailedToCommitClusterStateException exception;
@Override
public synchronized void onResponse(Void aVoid) {
assertThat(countDownLatch.getCount(), is(1L));
countDownLatch.countDown();
}
@Override
public synchronized void onFailure(Exception e) {
assertThat(e, IsInstanceOf.instanceOf(FailedToCommitClusterStateException.class));
exception = (FailedToCommitClusterStateException) e;
onResponse(null);
}
public void await() throws InterruptedException {
countDownLatch.await();
}
public synchronized FailedToCommitClusterStateException getException() {
return exception;
}
}
private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, MasterService masterService,
ThreadPool threadPool) {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

View File

@ -0,0 +1,149 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cluster;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static org.apache.lucene.util.LuceneTestCase.random;
import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class FakeThreadPoolMasterService extends MasterService {
private final String name;
private final List<Runnable> pendingTasks = new ArrayList<>();
private final Consumer<Runnable> onTaskAvailableToRun;
private boolean scheduledNextTask = false;
private boolean taskInProgress = false;
private boolean waitForPublish = false;
FakeThreadPoolMasterService(String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
super(Settings.EMPTY, createMockThreadPool());
this.name = serviceName;
this.onTaskAvailableToRun = onTaskAvailableToRun;
}
private static ThreadPool createMockThreadPool() {
final ThreadContext context = new ThreadContext(Settings.EMPTY);
final ThreadPool mockThreadPool = mock(ThreadPool.class);
when(mockThreadPool.getThreadContext()).thenReturn(context);
return mockThreadPool;
}
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
null, null) {
@Override
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
execute(command);
}
@Override
public void execute(Runnable command) {
pendingTasks.add(command);
scheduleNextTaskIfNecessary();
}
};
}
private void scheduleNextTaskIfNecessary() {
if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) {
scheduledNextTask = true;
onTaskAvailableToRun.accept(() -> {
assert taskInProgress == false;
assert waitForPublish == false;
assert scheduledNextTask;
final int taskIndex = randomInt(pendingTasks.size() - 1);
logger.debug("next master service task: choosing task {} of {}", taskIndex, pendingTasks.size());
final Runnable task = pendingTasks.remove(taskIndex);
taskInProgress = true;
scheduledNextTask = false;
task.run();
if (waitForPublish == false) {
taskInProgress = false;
}
scheduleNextTaskIfNecessary();
});
}
}
@Override
public ClusterState.Builder incrementVersion(ClusterState clusterState) {
// generate cluster UUID deterministically for repeatable tests
return ClusterState.builder(clusterState).incrementVersion().stateUUID(UUIDs.randomBase64UUID(random()));
}
@Override
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) {
assert waitForPublish == false;
waitForPublish = true;
final Discovery.AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state());
clusterStatePublisher.publish(clusterChangedEvent, new ActionListener<Void>() {
private boolean listenerCalled = false;
@Override
public void onResponse(Void aVoid) {
assert listenerCalled == false;
listenerCalled = true;
assert waitForPublish;
waitForPublish = false;
try {
onPublicationSuccess(clusterChangedEvent, taskOutputs, startTimeNS);
} finally {
taskInProgress = false;
scheduleNextTaskIfNecessary();
}
}
@Override
public void onFailure(Exception e) {
assert listenerCalled == false;
listenerCalled = true;
assert waitForPublish;
waitForPublish = false;
try {
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNS, e);
} finally {
taskInProgress = false;
scheduleNextTaskIfNecessary();
}
}
}, ackListener);
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
public class FakeThreadPoolMasterServiceTests extends ESTestCase {
public void testFakeMasterService() {
List<Runnable> runnableTasks = new ArrayList<>();
AtomicReference<ClusterState> lastClusterStateRef = new AtomicReference<>();
DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.CURRENT);
lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode));
long firstClusterStateVersion = lastClusterStateRef.get().version();
AtomicReference<ActionListener<Void>> publishingCallback = new AtomicReference<>();
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test", runnableTasks::add);
masterService.setClusterStateSupplier(lastClusterStateRef::get);
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
lastClusterStateRef.set(event.state());
publishingCallback.set(publishListener);
});
masterService.start();
AtomicBoolean firstTaskCompleted = new AtomicBoolean();
masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData()).put(indexBuilder("test1"))).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
assertFalse(firstTaskCompleted.get());
firstTaskCompleted.set(true);
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError();
}
});
assertThat(runnableTasks.size(), equalTo(1));
assertThat(lastClusterStateRef.get().metaData().indices().size(), equalTo(0));
assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion));
assertNull(publishingCallback.get());
assertFalse(firstTaskCompleted.get());
runnableTasks.remove(0).run();
assertThat(lastClusterStateRef.get().metaData().indices().size(), equalTo(1));
assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 1));
assertNotNull(publishingCallback.get());
assertFalse(firstTaskCompleted.get());
assertThat(runnableTasks.size(), equalTo(0));
AtomicBoolean secondTaskCompleted = new AtomicBoolean();
masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData()).put(indexBuilder("test2"))).build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
assertFalse(secondTaskCompleted.get());
secondTaskCompleted.set(true);
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError();
}
});
assertThat(runnableTasks.size(), equalTo(0));
publishingCallback.getAndSet(null).onResponse(null);
assertTrue(firstTaskCompleted.get());
assertThat(runnableTasks.size(), equalTo(1)); // check that new task gets queued
runnableTasks.remove(0).run();
assertThat(lastClusterStateRef.get().metaData().indices().size(), equalTo(2));
assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 2));
assertNotNull(publishingCallback.get());
assertFalse(secondTaskCompleted.get());
publishingCallback.getAndSet(null).onResponse(null);
assertTrue(secondTaskCompleted.get());
assertThat(runnableTasks.size(), equalTo(0)); // check that no more tasks are queued
}
private static IndexMetaData.Builder indexBuilder(String index) {
return IndexMetaData.builder(index).settings(settings(Version.CURRENT).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.test;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
@ -27,8 +28,8 @@ import org.elasticsearch.discovery.DiscoveryStats;
public class NoopDiscovery implements Discovery {
@Override
public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {
publishListener.onResponse(null);
}
@Override

View File

@ -21,7 +21,6 @@ package org.elasticsearch.test;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@ -36,14 +35,13 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery.AckListener;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static junit.framework.TestCase.fail;
@ -52,7 +50,10 @@ public class ClusterServiceUtils {
public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) {
MasterService masterService = new MasterService(Settings.EMPTY, threadPool);
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((event, ackListener) -> clusterStateRef.set(event.state()));
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
clusterStateRef.set(event.state());
publishListener.onResponse(null);
});
masterService.setClusterStateSupplier(clusterStateRef::get);
masterService.start();
return masterService;
@ -158,33 +159,21 @@ public class ClusterServiceUtils {
return clusterService;
}
public static BiConsumer<ClusterChangedEvent, AckListener> createClusterStatePublisher(ClusterApplier clusterApplier) {
return (event, ackListener) -> {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> ex = new AtomicReference<>();
public static ClusterStatePublisher createClusterStatePublisher(ClusterApplier clusterApplier) {
return (event, publishListener, ackListener) ->
clusterApplier.onNewClusterState("mock_publish_to_self[" + event.source() + "]", () -> event.state(),
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
latch.countDown();
publishListener.onResponse(null);
}
@Override
public void onFailure(String source, Exception e) {
ex.set(e);
latch.countDown();
publishListener.onFailure(e);
}
}
);
try {
latch.await();
} catch (InterruptedException e) {
Throwables.rethrow(e);
}
if (ex.get() != null) {
Throwables.rethrow(ex.get());
}
};
}
public static ClusterService createClusterService(ClusterState initialState, ThreadPool threadPool) {