diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index be9381a7fe6..ced8dd00982 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -40,8 +40,10 @@ public interface ClusterStateTaskExecutor { /** * Callback invoked after new cluster state is published. Note that * this method is not invoked if the cluster state was not updated. + * @param clusterChangedEvent the change event for this cluster state change, containing + * both old and new states */ - default void clusterStatePublished(ClusterState newClusterState) { + default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } /** @@ -92,7 +94,8 @@ public interface ClusterStateTaskExecutor { } private Builder result(T task, TaskResult executionResult) { - executionResults.put(task, executionResult); + TaskResult existing = executionResults.put(task, executionResult); + assert existing == null : task + " already has result " + existing; return this; } diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index edeaf95443f..630852361b4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.action.shard; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateTaskConfig; @@ -312,8 +313,8 @@ public class ShardStateAction extends AbstractComponent { } @Override - public void clusterStatePublished(ClusterState newClusterState) { - int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size(); + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size(); if (numberOfUnassignedShards > 0) { String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards); if (logger.isTraceEnabled()) { diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index ed4cfbd9134..16ec31be0b1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -614,5 +614,9 @@ public class DiscoveryNodes extends AbstractDiffable implements public static DiscoveryNodes readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException { return PROTO.readFrom(in, localNode); } + + public boolean isLocalNodeElectedMaster() { + return masterNodeId != null && masterNodeId.equals(localNodeId); + } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index ac4830fe0ec..13bbccc863a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -66,7 +66,9 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; @@ -371,34 +373,61 @@ public class ClusterService extends AbstractLifecycleComponent { public void submitStateUpdateTask(final String source, final T task, final ClusterStateTaskConfig config, final ClusterStateTaskExecutor executor, - final ClusterStateTaskListener listener - ) { - innerSubmitStateUpdateTask(source, task, config, executor, safe(listener, logger)); + final ClusterStateTaskListener listener) { + submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor); } - private void innerSubmitStateUpdateTask(final String source, final T task, - final ClusterStateTaskConfig config, - final ClusterStateTaskExecutor executor, - final SafeClusterStateTaskListener listener) { + /** + * Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together, + * potentially with more tasks of the same executor. + * + * @param source the source of the cluster state update task + * @param tasks a map of update tasks and their corresponding listeners + * @param config the cluster state update task configuration + * @param executor the cluster state update task executor; tasks + * that share the same executor will be executed + * batches on this executor + * @param the type of the cluster state update task state + */ + public void submitStateUpdateTasks(final String source, + final Map tasks, final ClusterStateTaskConfig config, + final ClusterStateTaskExecutor executor) { if (!lifecycle.started()) { return; } + if (tasks.isEmpty()) { + return; + } try { - final UpdateTask updateTask = new UpdateTask<>(source, task, config, executor, listener); + // convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal + final IdentityHashMap tasksIdentity = new IdentityHashMap<>(tasks); + final List> updateTasks = tasksIdentity.entrySet().stream().map( + entry -> new UpdateTask<>(source, entry.getKey(), config, executor, safe(entry.getValue(), logger)) + ).collect(Collectors.toList()); synchronized (updateTasksPerExecutor) { - updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()).add(updateTask); + List existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()); + for (@SuppressWarnings("unchecked") UpdateTask existing : existingTasks) { + if (tasksIdentity.containsKey(existing.task)) { + throw new IllegalArgumentException("task [" + existing.task + "] is already queued"); + } + } + existingTasks.addAll(updateTasks); } + final UpdateTask firstTask = updateTasks.get(0); + if (config.timeout() != null) { - updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> { - if (updateTask.processed.getAndSet(true) == false) { - logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout()); - listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)); + updateTasksExecutor.execute(firstTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> { + for (UpdateTask task : updateTasks) { + if (task.processed.getAndSet(true) == false) { + logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout()); + task.listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)); + } } })); } else { - updateTasksExecutor.execute(updateTask); + updateTasksExecutor.execute(firstTask); } } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting @@ -681,7 +710,7 @@ public class ClusterService extends AbstractLifecycleComponent { } try { - executor.clusterStatePublished(newClusterState); + executor.clusterStatePublished(clusterChangedEvent); } catch (Exception e) { logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, source); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index fdf084639d1..9659434e03d 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -18,9 +18,13 @@ */ package org.elasticsearch.discovery.zen; -import org.elasticsearch.ElasticsearchTimeoutException; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -30,21 +34,22 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.membership.MembershipAction; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes @@ -52,18 +57,17 @@ import java.util.concurrent.atomic.AtomicReference; */ public class NodeJoinController extends AbstractComponent { - final ClusterService clusterService; - final RoutingService routingService; - final ElectMasterService electMaster; - final DiscoverySettings discoverySettings; - final AtomicBoolean accumulateJoins = new AtomicBoolean(false); + private final ClusterService clusterService; + private final RoutingService routingService; + private final ElectMasterService electMaster; + private final DiscoverySettings discoverySettings; + private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(); - // this is site while trying to become a master - final AtomicReference electionContext = new AtomicReference<>(); + // this is set while trying to become a master + // mutation should be done under lock + private ElectionContext electionContext = null; - protected final Map> pendingJoinRequests = new HashMap<>(); - public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) { super(settings); this.clusterService = clusterService; @@ -75,7 +79,7 @@ public class NodeJoinController extends AbstractComponent { /** * waits for enough incoming joins from master eligible nodes to complete the master election *

- * You must start accumulating joins before calling this method. See {@link #startAccumulatingJoins()} + * You must start accumulating joins before calling this method. See {@link #startElectionContext()} *

* The method will return once the local node has been elected as master or some failure/timeout has happened. * The exact outcome is communicated via the callback parameter, which is guaranteed to be called. @@ -86,29 +90,32 @@ public class NodeJoinController extends AbstractComponent { * object **/ public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) { - assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins"; - final CountDownLatch done = new CountDownLatch(1); - final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) { + final ElectionCallback wrapperCallback = new ElectionCallback() { @Override - void onClose() { - if (electionContext.compareAndSet(this, null)) { - stopAccumulatingJoins("election closed"); - } else { - assert false : "failed to remove current election context"; - } + public void onElectedAsMaster(ClusterState state) { done.countDown(); + callback.onElectedAsMaster(state); + } + + @Override + public void onFailure(Throwable t) { + done.countDown(); + callback.onFailure(t); } }; - if (electionContext.compareAndSet(null, newContext) == false) { - // should never happen, but be conservative - failContext(newContext, new IllegalStateException("double waiting for election")); - return; - } + ElectionContext myElectionContext = null; + try { // check what we have so far.. - checkPendingJoinsAndElectIfNeeded(); + // capture the context we add the callback to make sure we fail our own + synchronized (this) { + assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins"; + myElectionContext = electionContext; + electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback); + checkPendingJoinsAndElectIfNeeded(); + } try { if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) { @@ -119,69 +126,46 @@ public class NodeJoinController extends AbstractComponent { } if (logger.isTraceEnabled()) { - final int pendingNodes; - synchronized (pendingJoinRequests) { - pendingNodes = pendingJoinRequests.size(); - } - logger.trace("timed out waiting to be elected. waited [{}]. pending node joins [{}]", timeValue, pendingNodes); + final int pendingNodes = myElectionContext.getPendingMasterJoinsCount(); + logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes); } - // callback will clear the context, if it's active - failContext(newContext, new ElasticsearchTimeoutException("timed out waiting to be elected")); + failContextIfNeeded(myElectionContext, "timed out waiting to be elected"); } catch (Throwable t) { logger.error("unexpected failure while waiting for incoming joins", t); - failContext(newContext, "unexpected failure while waiting for pending joins", t); + if (myElectionContext != null) { + failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + t.getMessage() + "]"); + } } } - private void failContext(final ElectionContext context, final Throwable throwable) { - failContext(context, throwable.getMessage(), throwable); - } - - /** utility method to fail the given election context under the cluster state thread */ - private void failContext(final ElectionContext context, final String reason, final Throwable throwable) { - clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", new ClusterStateUpdateTask(Priority.IMMEDIATE) { - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - context.onFailure(throwable); - return currentState; - } - - @Override - public void onFailure(String source, Throwable updateFailure) { - logger.warn("unexpected error while trying to fail election context due to [{}]. original exception [{}]", updateFailure, reason, throwable); - context.onFailure(updateFailure); - } - }); - + /** + * utility method to fail the given election context under the cluster state thread + */ + private synchronized void failContextIfNeeded(final ElectionContext context, final String reason) { + if (electionContext == context) { + stopElectionContext(reason); + } } /** * Accumulates any future incoming join request. Pending join requests will be processed in the final steps of becoming a - * master or when {@link #stopAccumulatingJoins(String)} is called. + * master or when {@link #stopElectionContext(String)} is called. */ - public void startAccumulatingJoins() { - logger.trace("starting to accumulate joins"); - boolean b = accumulateJoins.getAndSet(true); - assert b == false : "double startAccumulatingJoins() calls"; - assert electionContext.get() == null : "startAccumulatingJoins() called, but there is an ongoing election context"; + public synchronized void startElectionContext() { + logger.trace("starting an election context, will accumulate joins"); + assert electionContext == null : "double startElectionContext() calls"; + electionContext = new ElectionContext(); } - /** Stopped accumulating joins. All pending joins will be processed. Future joins will be processed immediately */ - public void stopAccumulatingJoins(String reason) { - logger.trace("stopping join accumulation ([{}])", reason); - assert electionContext.get() == null : "stopAccumulatingJoins() called, but there is an ongoing election context"; - boolean b = accumulateJoins.getAndSet(false); - assert b : "stopAccumulatingJoins() called but not accumulating"; - synchronized (pendingJoinRequests) { - if (pendingJoinRequests.size() > 0) { - processJoins("pending joins after accumulation stop [" + reason + "]"); - } + /** + * Stopped accumulating joins. All pending joins will be processed. Future joins will be processed immediately + */ + public void stopElectionContext(String reason) { + logger.trace("stopping election ([{}])", reason); + synchronized (this) { + assert electionContext != null : "stopElectionContext() called but not accumulating"; + electionContext.closeAndProcessPending(reason); + electionContext = null; } } @@ -190,19 +174,14 @@ public class NodeJoinController extends AbstractComponent { *

* Note: doesn't do any validation. This should have been done before. */ - public void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { - synchronized (pendingJoinRequests) { - List nodeCallbacks = pendingJoinRequests.get(node); - if (nodeCallbacks == null) { - nodeCallbacks = new ArrayList<>(); - pendingJoinRequests.put(node, nodeCallbacks); - } - nodeCallbacks.add(callback); - } - if (accumulateJoins.get() == false) { - processJoins("join from node[" + node + "]"); - } else { + public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { + if (electionContext != null) { + electionContext.addIncomingJoin(node, callback); checkPendingJoinsAndElectIfNeeded(); + } else { + clusterService.submitStateUpdateTask("zen-disco-join(node " + node + "])", + node, ClusterStateTaskConfig.build(Priority.URGENT), + joinTaskExecutor, new JoinTaskListener(callback, logger)); } } @@ -210,86 +189,24 @@ public class NodeJoinController extends AbstractComponent { * checks if there is an on going request to become master and if it has enough pending joins. If so, the node will * become master via a ClusterState update task. */ - private void checkPendingJoinsAndElectIfNeeded() { - assert accumulateJoins.get() : "election check requested but we are not accumulating joins"; - final ElectionContext context = electionContext.get(); - if (context == null) { - return; + private synchronized void checkPendingJoinsAndElectIfNeeded() { + assert electionContext != null : "election check requested but no active context"; + final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount(); + if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) { + if (logger.isTraceEnabled()) { + logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins, + electionContext.requiredMasterJoins); + } + } else { + if (logger.isTraceEnabled()) { + logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins, + electionContext.requiredMasterJoins); + } + electionContext.closeAndBecomeMaster(); + electionContext = null; // clear this out so future joins won't be accumulated } - - int pendingMasterJoins = 0; - synchronized (pendingJoinRequests) { - for (DiscoveryNode node : pendingJoinRequests.keySet()) { - if (node.isMasterNode()) { - pendingMasterJoins++; - } - } - } - if (pendingMasterJoins < context.requiredMasterJoins) { - if (context.pendingSetAsMasterTask.get() == false) { - logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins, context.requiredMasterJoins); - } - return; - } - if (context.pendingSetAsMasterTask.getAndSet(true)) { - logger.trace("elected as master task already submitted, ignoring..."); - return; - } - - final String source = "zen-disco-join(elected_as_master, [" + pendingMasterJoins + "] joins received)"; - clusterService.submitStateUpdateTask(source, new ProcessJoinsTask(Priority.IMMEDIATE) { - @Override - public ClusterState execute(ClusterState currentState) { - // Take into account the previous known nodes, if they happen not to be available - // then fault detection will remove these nodes. - - if (currentState.nodes().getMasterNode() != null) { - // TODO can we tie break here? we don't have a remote master cluster state version to decide on - logger.trace("join thread elected local node as master, but there is already a master in place: {}", currentState.nodes().getMasterNode()); - throw new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request"); - } - - DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNode().getId()); - // update the fact that we are the master... - ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); - currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build(); - - // reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table) - RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState, "nodes joined"); - if (result.changed()) { - currentState = ClusterState.builder(currentState).routingResult(result).build(); - } - - // Add the incoming join requests. - // Note: we only do this now (after the reroute) to avoid assigning shards to these nodes. - return super.execute(currentState); - } - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public void onFailure(String source, Throwable t) { - super.onFailure(source, t); - context.onFailure(t); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - super.clusterStateProcessed(source, oldState, newState); - context.onElectedAsMaster(newState); - } - }); } - /** process all pending joins */ - private void processJoins(String reason) { - clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", new ProcessJoinsTask(Priority.URGENT)); - } - - public interface ElectionCallback { /** * called when the local node is successfully elected as master @@ -304,119 +221,143 @@ public class NodeJoinController extends AbstractComponent { void onFailure(Throwable t); } - static abstract class ElectionContext implements ElectionCallback { - private final ElectionCallback callback; - private final int requiredMasterJoins; + class ElectionContext { + private ElectionCallback callback = null; + private int requiredMasterJoins = -1; + private final Map> joinRequestAccumulator = new HashMap<>(); - /** set to true after enough joins have been seen and a cluster update task is submitted to become master */ - final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean(); final AtomicBoolean closed = new AtomicBoolean(); - ElectionContext(ElectionCallback callback, int requiredMasterJoins) { - this.callback = callback; + public synchronized void onAttemptToBeElected(int requiredMasterJoins, ElectionCallback callback) { + ensureOpen(); + assert this.requiredMasterJoins < 0; + assert this.callback == null; this.requiredMasterJoins = requiredMasterJoins; + this.callback = callback; } - abstract void onClose(); + public synchronized void addIncomingJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) { + ensureOpen(); + joinRequestAccumulator.computeIfAbsent(node, n -> new ArrayList<>()).add(callback); + } - @Override - public void onElectedAsMaster(ClusterState state) { - assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set"; + + public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) { + final boolean hasEnough; + if (requiredMasterJoins < 0) { + // requiredMasterNodes is unknown yet, return false and keep on waiting + hasEnough = false; + } else { + assert callback != null : "requiredMasterJoins is set but not the callback"; + hasEnough = pendingMasterJoins >= requiredMasterJoins; + } + return hasEnough; + } + + private Map getPendingAsTasks() { + Map tasks = new HashMap<>(); + joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put(e.getKey(), new JoinTaskListener(e.getValue(), logger))); + return tasks; + } + + public synchronized int getPendingMasterJoinsCount() { + int pendingMasterJoins = 0; + for (DiscoveryNode node : joinRequestAccumulator.keySet()) { + if (node.isMasterNode()) { + pendingMasterJoins++; + } + } + return pendingMasterJoins; + } + + public synchronized void closeAndBecomeMaster() { + assert callback != null : "becoming a master but the callback is not yet set"; + assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master but pending joins of " + + getPendingMasterJoinsCount() + " are not enough. needs [" + requiredMasterJoins + "];"; + + innerClose(); + + Map tasks = getPendingAsTasks(); + final String source = "zen-disco-join(elected_as_master, [" + tasks.size() + "] nodes joined)"; + + tasks.put(BECOME_MASTER_TASK, joinProcessedListener); + clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); + } + + public synchronized void closeAndProcessPending(String reason) { + innerClose(); + Map tasks = getPendingAsTasks(); + final String source = "zen-disco-join(election stopped [" + reason + "] nodes joined"; + + tasks.put(FINISH_ELECTION_NOT_MASTER_TASK, joinProcessedListener); + clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); + } + + private void innerClose() { + if (closed.getAndSet(true)) { + throw new AlreadyClosedException("election context is already closed"); + } + } + + private void ensureOpen() { + if (closed.get()) { + throw new AlreadyClosedException("election context is already closed"); + } + } + + private synchronized ElectionCallback getCallback() { + return callback; + } + + private void onElectedAsMaster(ClusterState state) { ClusterService.assertClusterStateThread(); assert state.nodes().isLocalNodeElectedMaster() : "onElectedAsMaster called but local node is not master"; - if (closed.compareAndSet(false, true)) { - try { - onClose(); - } finally { - callback.onElectedAsMaster(state); - } + ElectionCallback callback = getCallback(); // get under lock + if (callback != null) { + callback.onElectedAsMaster(state); } } - @Override - public void onFailure(Throwable t) { + private void onFailure(Throwable t) { ClusterService.assertClusterStateThread(); - if (closed.compareAndSet(false, true)) { - try { - onClose(); - } finally { - callback.onFailure(t); - } + ElectionCallback callback = getCallback(); // get under lock + if (callback != null) { + callback.onFailure(t); } } + + private final ClusterStateTaskListener joinProcessedListener = new ClusterStateTaskListener() { + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + assert newState.nodes().isLocalNodeElectedMaster() : "should have become a master but isn't " + newState.prettyPrint(); + onElectedAsMaster(newState); + } + + @Override + public void onFailure(String source, Throwable t) { + ElectionContext.this.onFailure(t); + } + }; + } + static class JoinTaskListener implements ClusterStateTaskListener { + final List callbacks; + final private ESLogger logger; - /** - * Processes any pending joins via a ClusterState update task. - * Note: this task automatically fails (and fails all pending joins) if the current node is not marked as master - */ - class ProcessJoinsTask extends ClusterStateUpdateTask { + JoinTaskListener(MembershipAction.JoinCallback callback, ESLogger logger) { + this(Collections.singletonList(callback), logger); + } - private final List joinCallbacksToRespondTo = new ArrayList<>(); - private boolean nodeAdded = false; - - public ProcessJoinsTask(Priority priority) { - super(priority); + JoinTaskListener(List callbacks, ESLogger logger) { + this.callbacks = callbacks; + this.logger = logger; } @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder; - synchronized (pendingJoinRequests) { - if (pendingJoinRequests.isEmpty()) { - return currentState; - } - - nodesBuilder = DiscoveryNodes.builder(currentState.nodes()); - Iterator>> iterator = pendingJoinRequests.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - final DiscoveryNode node = entry.getKey(); - joinCallbacksToRespondTo.addAll(entry.getValue()); - iterator.remove(); - if (currentState.nodes().nodeExists(node.getId())) { - logger.debug("received a join request for an existing node [{}]", node); - } else { - nodeAdded = true; - nodesBuilder.put(node); - for (DiscoveryNode existingNode : currentState.nodes()) { - if (node.getAddress().equals(existingNode.getAddress())) { - nodesBuilder.remove(existingNode.getId()); - logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode); - } - } - } - } - } - - // we must return a new cluster state instance to force publishing. This is important - // for the joining node to finalize it's join and set us as a master - final ClusterState.Builder newState = ClusterState.builder(currentState); - if (nodeAdded) { - newState.nodes(nodesBuilder); - } - - return newState.build(); - } - - @Override - public void onNoLongerMaster(String source) { - // we are rejected, so drain all pending task (execute never run) - synchronized (pendingJoinRequests) { - Iterator>> iterator = pendingJoinRequests.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry> entry = iterator.next(); - joinCallbacksToRespondTo.addAll(entry.getValue()); - iterator.remove(); - } - } - Exception e = new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request"); - innerOnFailure(e); - } - - void innerOnFailure(Throwable t) { - for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) { + public void onFailure(String source, Throwable t) { + for (MembershipAction.JoinCallback callback : callbacks) { try { callback.onFailure(t); } catch (Exception e) { @@ -425,29 +366,111 @@ public class NodeJoinController extends AbstractComponent { } } - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - innerOnFailure(t); - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (nodeAdded) { - // we reroute not in the same cluster state update since in certain areas we rely on - // the node to be in the cluster state (sampled from ClusterService#state) to be there, also - // shard transitions need to better be handled in such cases - routingService.reroute("post_node_add"); - } - for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) { + for (MembershipAction.JoinCallback callback : callbacks) { try { callback.onSuccess(); } catch (Exception e) { logger.error("unexpected error during [{}]", e, source); } } + } + } - NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(oldState, newState); + // a task indicated that the current node should become master, if no current master is known + private final static DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", DummyTransportAddress.INSTANCE, + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + + // a task that is used to process pending joins without explicitly becoming a master and listening to the results + // this task is used when election is stop without the local node becoming a master per se (though it might + private final static DiscoveryNode FINISH_ELECTION_NOT_MASTER_TASK = new DiscoveryNode("_NOT_MASTER_TASK_", + DummyTransportAddress.INSTANCE, Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + + class JoinTaskExecutor implements ClusterStateTaskExecutor { + + @Override + public BatchResult execute(ClusterState currentState, List joiningNodes) throws Exception { + final DiscoveryNodes currentNodes = currentState.nodes(); + final BatchResult.Builder results = BatchResult.builder(); + boolean nodesChanged = false; + ClusterState.Builder newState = ClusterState.builder(currentState); + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes); + + if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) { + // use these joins to try and become the master. + // Note that we don't have to do any validation of the amount of joining nodes - the commit + // during the cluster state publishing guarantees that we have enough + + nodesBuilder.masterNodeId(currentNodes.getLocalNodeId()); + ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()) + .removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); + newState.blocks(clusterBlocks); + newState.nodes(nodesBuilder); + nodesChanged = true; + + // reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table) + // Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster + // state is published. + // TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove. + + final ClusterState tmpState = newState.build(); + RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined"); + newState = ClusterState.builder(tmpState); + if (result.changed()) { + newState.routingResult(result); + } + nodesBuilder = DiscoveryNodes.builder(tmpState.nodes()); + } + + if (nodesBuilder.isLocalNodeElectedMaster() == false) { + logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode()); + throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request"); + } + + for (final DiscoveryNode node : joiningNodes) { + if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_NOT_MASTER_TASK)) { + // noop + } else if (currentNodes.nodeExists(node.getId())) { + logger.debug("received a join request for an existing node [{}]", node); + } else { + nodesChanged = true; + nodesBuilder.put(node); + for (DiscoveryNode existingNode : currentNodes) { + if (node.getAddress().equals(existingNode.getAddress())) { + nodesBuilder.remove(existingNode.getId()); + logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode); + } + } + } + results.success(node); + } + + if (nodesChanged) { + newState.nodes(nodesBuilder); + } + + // we must return a new cluster state instance to force publishing. This is important + // for the joining node to finalize its join and set us as a master + return results.build(newState.build()); + } + + @Override + public boolean runOnlyOnMaster() { + // we validate that we are allowed to change the cluster state during cluster state processing + return false; + } + + @Override + public void clusterStatePublished(ClusterChangedEvent event) { + if (event.nodesDelta().hasChanges()) { + // we reroute not in the same cluster state update since in certain areas we rely on + // the node to be in the cluster state (sampled from ClusterService#state) to be there, also + // shard transitions need to better be handled in such cases + routingService.reroute("post_node_add"); + } + + NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); } } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 4d2b9e7c5e7..3c949b768d1 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -372,7 +372,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private void innerJoinCluster() { DiscoveryNode masterNode = null; final Thread currentThread = Thread.currentThread(); - nodeJoinController.startAccumulatingJoins(); + nodeJoinController.startElectionContext(); while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) { masterNode = findMaster(); } @@ -406,7 +406,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen ); } else { // process any incoming joins (they will fail because we are not the master) - nodeJoinController.stopAccumulatingJoins("not master"); + nodeJoinController.stopElectionContext("not master"); // send join request final boolean success = joinElectedMaster(masterNode); diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index bf85f67e48e..dd71fa6a4a7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.service; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -35,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; @@ -51,8 +53,10 @@ import org.junit.BeforeClass; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -62,12 +66,15 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; public class ClusterServiceTests extends ESTestCase { @@ -102,10 +109,10 @@ public class ClusterServiceTests extends ESTestCase { TimedClusterService createTimedClusterService(boolean makeMaster) throws InterruptedException { TimedClusterService timedClusterService = new TimedClusterService(Settings.EMPTY, null, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool, new ClusterName("ClusterServiceTests")); + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool, new ClusterName("ClusterServiceTests")); timedClusterService.setLocalNode(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, emptyMap(), - emptySet(), Version.CURRENT)); + emptySet(), Version.CURRENT)); timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override public void connectToAddedNodes(ClusterChangedEvent event) { @@ -123,9 +130,9 @@ public class ClusterServiceTests extends ESTestCase { ClusterState state = timedClusterService.state(); final DiscoveryNodes nodes = state.nodes(); final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes) - .masterNodeId(makeMaster ? nodes.getLocalNodeId() : null); + .masterNodeId(makeMaster ? nodes.getLocalNodeId() : null); state = ClusterState.builder(state).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) - .nodes(nodesBuilder).build(); + .nodes(nodesBuilder).build(); setState(timedClusterService, state); return timedClusterService; } @@ -253,37 +260,37 @@ public class ClusterServiceTests extends ESTestCase { AtomicBoolean published = new AtomicBoolean(); clusterService.submitStateUpdateTask( - "testClusterStateTaskListenerThrowingExceptionIsOkay", - new Object(), - ClusterStateTaskConfig.build(Priority.NORMAL), - new ClusterStateTaskExecutor() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterState newClusterState = ClusterState.builder(currentState).build(); - return BatchResult.builder().successes(tasks).build(newClusterState); - } - - @Override - public void clusterStatePublished(ClusterState newClusterState) { - published.set(true); - latch.countDown(); - } - }, - new ClusterStateTaskListener() { - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - throw new IllegalStateException(source); - } - - @Override - public void onFailure(String source, Throwable t) { - } + "testClusterStateTaskListenerThrowingExceptionIsOkay", + new Object(), + ClusterStateTaskConfig.build(Priority.NORMAL), + new ClusterStateTaskExecutor() { + @Override + public boolean runOnlyOnMaster() { + return false; } + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterState newClusterState = ClusterState.builder(currentState).build(); + return BatchResult.builder().successes(tasks).build(newClusterState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + published.set(true); + latch.countDown(); + } + }, + new ClusterStateTaskListener() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + throw new IllegalStateException(source); + } + + @Override + public void onFailure(String source, Throwable t) { + } + } ); latch.await(); @@ -342,7 +349,7 @@ public class ClusterServiceTests extends ESTestCase { barrier.await(); for (int j = 0; j < tasksSubmittedPerThread; j++) { clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, - ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener); + ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener); } barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { @@ -370,10 +377,43 @@ public class ClusterServiceTests extends ESTestCase { } } + public void testSingleBatchSubmission() throws InterruptedException { + Map tasks = new HashMap<>(); + final int numOfTasks = randomInt(10); + final CountDownLatch latch = new CountDownLatch(numOfTasks); + for (int i = 0; i < numOfTasks; i++) { + tasks.put(randomInt(1024), new ClusterStateTaskListener() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(ExceptionsHelper.detailedMessage(t)); + } + }); + } + + clusterService.submitStateUpdateTasks("test", tasks, ClusterStateTaskConfig.build(Priority.LANGUID), + (currentState, taskList) -> { + assertThat(taskList.size(), equalTo(tasks.size())); + assertThat(taskList.stream().collect(Collectors.toSet()), equalTo(tasks.keySet())); + return ClusterStateTaskExecutor.BatchResult.builder().successes(taskList).build(currentState); + }); + + latch.await(); + } + public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { AtomicInteger counter = new AtomicInteger(); class Task { private AtomicBoolean state = new AtomicBoolean(); + private final int id; + + Task(int id) { + this.id = id; + } public void execute() { if (!state.compareAndSet(false, true)) { @@ -382,21 +422,54 @@ public class ClusterServiceTests extends ESTestCase { counter.incrementAndGet(); } } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Task task = (Task) o; + return id == task.id; + + } + + @Override + public int hashCode() { + return id; + } + + @Override + public String toString() { + return Integer.toString(id); + } } int numberOfThreads = randomIntBetween(2, 8); - int tasksSubmittedPerThread = randomIntBetween(1, 1024); + int taskSubmissionsPerThread = randomIntBetween(1, 64); int numberOfExecutors = Math.max(1, numberOfThreads / 4); final Semaphore semaphore = new Semaphore(numberOfExecutors); class TaskExecutor implements ClusterStateTaskExecutor { + private final List> taskGroups; private AtomicInteger counter = new AtomicInteger(); private AtomicInteger batches = new AtomicInteger(); private AtomicInteger published = new AtomicInteger(); + public TaskExecutor(List> taskGroups) { + this.taskGroups = taskGroups; + } + @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - tasks.forEach(task -> task.execute()); + for (Set expectedSet : taskGroups) { + long count = tasks.stream().filter(expectedSet::contains).count(); + assertThat("batched set should be executed together or not at all. Expected " + expectedSet + "s. Executing " + tasks, + count, anyOf(equalTo(0L), equalTo((long) expectedSet.size()))); + } + tasks.forEach(Task::execute); counter.addAndGet(tasks.size()); ClusterState maybeUpdatedClusterState = currentState; if (randomBoolean()) { @@ -413,59 +486,85 @@ public class ClusterServiceTests extends ESTestCase { } @Override - public void clusterStatePublished(ClusterState newClusterState) { + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { published.incrementAndGet(); semaphore.release(); } } - ConcurrentMap counters = new ConcurrentHashMap<>(); - CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); - ClusterStateTaskListener listener = new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Throwable t) { - assert false; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - counters.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet(); - updateLatch.countDown(); - } - }; + ConcurrentMap processedStates = new ConcurrentHashMap<>(); + List> taskGroups = new ArrayList<>(); List executors = new ArrayList<>(); for (int i = 0; i < numberOfExecutors; i++) { - executors.add(new TaskExecutor()); + executors.add(new TaskExecutor(taskGroups)); } // randomly assign tasks to executors - List assignments = new ArrayList<>(); + List>> assignments = new ArrayList<>(); + int taskId = 0; for (int i = 0; i < numberOfThreads; i++) { - for (int j = 0; j < tasksSubmittedPerThread; j++) { - assignments.add(randomFrom(executors)); + for (int j = 0; j < taskSubmissionsPerThread; j++) { + TaskExecutor executor = randomFrom(executors); + Set tasks = new HashSet<>(); + for (int t = randomInt(3); t >= 0; t--) { + tasks.add(new Task(taskId++)); + } + taskGroups.add(tasks); + assignments.add(Tuple.tuple(executor, tasks)); } } Map counts = new HashMap<>(); - for (TaskExecutor executor : assignments) { - counts.merge(executor, 1, (previous, one) -> previous + one); + int totalTaskCount = 0; + for (Tuple> assignment : assignments) { + final int taskCount = assignment.v2().size(); + counts.merge(assignment.v1(), taskCount, (previous, count) -> previous + count); + totalTaskCount += taskCount; } + final CountDownLatch updateLatch = new CountDownLatch(totalTaskCount); + final ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Throwable t) { + fail(ExceptionsHelper.detailedMessage(t)); + } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + processedStates.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet(); + updateLatch.countDown(); + } + }; + + final ConcurrentMap submittedTasksPerThread = new ConcurrentHashMap<>(); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { final int index = i; Thread thread = new Thread(() -> { + final String threadName = Thread.currentThread().getName(); try { barrier.await(); - for (int j = 0; j < tasksSubmittedPerThread; j++) { - ClusterStateTaskExecutor executor = assignments.get(index * tasksSubmittedPerThread + j); - clusterService.submitStateUpdateTask( - Thread.currentThread().getName(), - new Task(), + for (int j = 0; j < taskSubmissionsPerThread; j++) { + Tuple> assignment = assignments.get(index * taskSubmissionsPerThread + j); + final Set tasks = assignment.v2(); + submittedTasksPerThread.computeIfAbsent(threadName, key -> new AtomicInteger()).addAndGet(tasks.size()); + final TaskExecutor executor = assignment.v1(); + if (tasks.size() == 1) { + clusterService.submitStateUpdateTask( + threadName, + tasks.stream().findFirst().get(), ClusterStateTaskConfig.build(randomFrom(Priority.values())), executor, listener); + } else { + Map taskListeners = new HashMap<>(); + tasks.stream().forEach(t -> taskListeners.put(t, listener)); + clusterService.submitStateUpdateTasks( + threadName, + taskListeners, ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor + ); + } } barrier.await(); } catch (BrokenBarrierException | InterruptedException e) { @@ -486,7 +585,7 @@ public class ClusterServiceTests extends ESTestCase { semaphore.acquire(numberOfExecutors); // assert the number of executed tasks is correct - assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get()); + assertEquals(totalTaskCount, counter.get()); // assert each executor executed the correct number of tasks for (TaskExecutor executor : executors) { @@ -497,8 +596,10 @@ public class ClusterServiceTests extends ESTestCase { } // assert the correct number of clusterStateProcessed events were triggered - for (Map.Entry entry : counters.entrySet()) { - assertEquals(entry.getValue().get(), tasksSubmittedPerThread); + for (Map.Entry entry : processedStates.entrySet()) { + assertThat(submittedTasksPerThread, hasKey(entry.getKey())); + assertEquals("not all tasks submitted by " + entry.getKey() + " received a processed event", + entry.getValue().get(), submittedTasksPerThread.get(entry.getKey()).get()); } } @@ -506,9 +607,6 @@ public class ClusterServiceTests extends ESTestCase { * Note, this test can only work as long as we have a single thread executor executing the state update tasks! */ public void testPrioritizedTasks() throws Exception { - Settings settings = Settings.builder() - .put("discovery.type", "local") - .build(); BlockingTask block = new BlockingTask(Priority.IMMEDIATE); clusterService.submitStateUpdateTask("test", block); int taskCount = randomIntBetween(5, 20); @@ -522,7 +620,7 @@ public class ClusterServiceTests extends ESTestCase { clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks)); } - block.release(); + block.close(); latch.await(); Priority prevPriority = null; @@ -535,15 +633,49 @@ public class ClusterServiceTests extends ESTestCase { } } + public void testDuplicateSubmission() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(2); + try (BlockingTask blockingTask = new BlockingTask(Priority.IMMEDIATE)) { + clusterService.submitStateUpdateTask("blocking", blockingTask); + + ClusterStateTaskExecutor executor = (currentState, tasks) -> + ClusterStateTaskExecutor.BatchResult.builder().successes(tasks).build(currentState); + + SimpleTask task = new SimpleTask(1); + ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(ExceptionsHelper.detailedMessage(t)); + } + }; + + clusterService.submitStateUpdateTask("first time", task, ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener); + + expectThrows(IllegalArgumentException.class, () -> clusterService.submitStateUpdateTask("second time", task, + ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener)); + + clusterService.submitStateUpdateTask("third time a charm", new SimpleTask(1), + ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener); + + assertThat(latch.getCount(), equalTo(2L)); + } + latch.await(); + } + @TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level public void testClusterStateUpdateLogging() throws Exception { MockLogAppender mockAppender = new MockLogAppender(); mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", "cluster.service", Level.DEBUG, - "*processing [test1]: took [1s] no change in cluster_state")); + "*processing [test1]: took [1s] no change in cluster_state")); mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.TRACE, - "*failed to execute cluster state update in [2s]*")); + "*failed to execute cluster state update in [2s]*")); mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.DEBUG, - "*processing [test3]: took [3s] done applying updated cluster_state (version: *, uuid: *)")); + "*processing [test3]: took [3s] done applying updated cluster_state (version: *, uuid: *)")); Logger rootLogger = Logger.getRootLogger(); rootLogger.addAppender(mockAppender); @@ -630,13 +762,13 @@ public class ClusterServiceTests extends ESTestCase { public void testLongClusterStateUpdateLogging() throws Exception { MockLogAppender mockAppender = new MockLogAppender(); mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test1 shouldn't see because setting is too low", - "cluster.service", Level.WARN, "*cluster state update task [test1] took [*] above the warn threshold of *")); + "cluster.service", Level.WARN, "*cluster state update task [test1] took [*] above the warn threshold of *")); mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.WARN, - "*cluster state update task [test2] took [32s] above the warn threshold of *")); + "*cluster state update task [test2] took [32s] above the warn threshold of *")); mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.WARN, - "*cluster state update task [test3] took [33s] above the warn threshold of *")); + "*cluster state update task [test3] took [33s] above the warn threshold of *")); mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", "cluster.service", Level.WARN, - "*cluster state update task [test4] took [34s] above the warn threshold of *")); + "*cluster state update task [test4] took [34s] above the warn threshold of *")); Logger rootLogger = Logger.getRootLogger(); rootLogger.addAppender(mockAppender); @@ -740,7 +872,25 @@ public class ClusterServiceTests extends ESTestCase { mockAppender.assertAllExpectationsMatched(); } - private static class BlockingTask extends ClusterStateUpdateTask { + private static class SimpleTask { + private final int id; + + private SimpleTask(int id) { + this.id = id; + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + } + + private static class BlockingTask extends ClusterStateUpdateTask implements Releasable { private final CountDownLatch latch = new CountDownLatch(1); public BlockingTask(Priority priority) { @@ -757,7 +907,7 @@ public class ClusterServiceTests extends ESTestCase { public void onFailure(String source, Throwable t) { } - public void release() { + public void close() { latch.countDown(); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index b7db9f9e609..27c38e66074 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.discovery.zen; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; @@ -99,11 +98,11 @@ public class NodeJoinControllerTests extends ESTestCase { final DiscoveryNode localNode = initialNodes.getLocalNode(); // make sure we have a master setState(clusterService, ClusterState.builder(clusterService.state()).nodes( - DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId()))); + DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId()))); nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY), - new ElectMasterService(Settings.EMPTY, Version.CURRENT), - new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - Settings.EMPTY); + new ElectMasterService(Settings.EMPTY, Version.CURRENT), + new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + Settings.EMPTY); } @After @@ -122,14 +121,14 @@ public class NodeJoinControllerTests extends ESTestCase { nodes.add(node); joinNode(node); } - nodeJoinController.startAccumulatingJoins(); + nodeJoinController.startElectionContext(); ArrayList> pendingJoins = new ArrayList<>(); for (int i = randomInt(5); i > 0; i--) { DiscoveryNode node = newNode(nodeId++); nodes.add(node); pendingJoins.add(joinNodeAsync(node)); } - nodeJoinController.stopAccumulatingJoins("test"); + nodeJoinController.stopElectionContext("test"); boolean hadSyncJoin = false; for (int i = randomInt(5); i > 0; i--) { DiscoveryNode node = newNode(nodeId++); @@ -145,8 +144,6 @@ public class NodeJoinControllerTests extends ESTestCase { for (Future joinFuture : pendingJoins) { joinFuture.get(); } - - assertNodesInCurrentState(nodes); } public void testFailingJoinsWhenNotMaster() throws ExecutionException, InterruptedException { @@ -163,14 +160,14 @@ public class NodeJoinControllerTests extends ESTestCase { logger.debug("--> testing joins fail post accumulation"); ArrayList> pendingJoins = new ArrayList<>(); - nodeJoinController.startAccumulatingJoins(); + nodeJoinController.startElectionContext(); for (int i = 1 + randomInt(5); i > 0; i--) { DiscoveryNode node = newNode(nodeId++); final Future future = joinNodeAsync(node); pendingJoins.add(future); assertThat(future.isDone(), equalTo(false)); } - nodeJoinController.stopAccumulatingJoins("test"); + nodeJoinController.stopElectionContext("test"); for (Future future : pendingJoins) { try { future.get(); @@ -197,7 +194,7 @@ public class NodeJoinControllerTests extends ESTestCase { } } - nodeJoinController.startAccumulatingJoins(); + nodeJoinController.startElectionContext(); final SimpleFuture electionFuture = new SimpleFuture("master election"); final Thread masterElection = new Thread(new AbstractRunnable() { @Override @@ -245,7 +242,7 @@ public class NodeJoinControllerTests extends ESTestCase { } } - nodeJoinController.startAccumulatingJoins(); + nodeJoinController.startElectionContext(); final SimpleFuture electionFuture = new SimpleFuture("master election"); final Thread masterElection = new Thread(new AbstractRunnable() { @Override @@ -334,8 +331,8 @@ public class NodeJoinControllerTests extends ESTestCase { } logger.debug("--> testing accumulation stopped"); - nodeJoinController.startAccumulatingJoins(); - nodeJoinController.stopAccumulatingJoins("test"); + nodeJoinController.startElectionContext(); + nodeJoinController.stopElectionContext("test"); } @@ -356,7 +353,7 @@ public class NodeJoinControllerTests extends ESTestCase { } } - nodeJoinController.startAccumulatingJoins(); + nodeJoinController.startElectionContext(); final int initialJoins = randomIntBetween(0, requiredJoins - 1); final ArrayList pendingJoins = new ArrayList<>(); ArrayList nodesToJoin = new ArrayList<>(); @@ -389,7 +386,7 @@ public class NodeJoinControllerTests extends ESTestCase { }); latch.await(); logger.debug("--> verifying election timed out"); - assertThat(failure.get(), instanceOf(ElasticsearchTimeoutException.class)); + assertThat(failure.get(), instanceOf(NotMasterException.class)); logger.debug("--> verifying all joins are failed"); for (SimpleFuture future : pendingJoins) { @@ -407,7 +404,7 @@ public class NodeJoinControllerTests extends ESTestCase { ClusterState state = clusterService.state(); final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(state.nodes()); final DiscoveryNode other_node = new DiscoveryNode("other_node", DummyTransportAddress.INSTANCE, - emptyMap(), emptySet(), Version.CURRENT); + emptyMap(), emptySet(), Version.CURRENT); nodesBuilder.put(other_node); setState(clusterService, ClusterState.builder(state).nodes(nodesBuilder)); @@ -457,7 +454,7 @@ public class NodeJoinControllerTests extends ESTestCase { DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null); setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder)); - nodeJoinController.startAccumulatingJoins(); + nodeJoinController.startElectionContext(); Thread[] threads = new Thread[3 + randomInt(5)]; final int requiredJoins = randomInt(threads.length); @@ -589,7 +586,9 @@ public class NodeJoinControllerTests extends ESTestCase { private SimpleFuture joinNodeAsync(final DiscoveryNode node) throws InterruptedException { final SimpleFuture future = new SimpleFuture("join of " + node + " (id [" + joinId.incrementAndGet() + "]"); logger.debug("starting {}", future); - nodeJoinController.handleJoinRequest(node, new MembershipAction.JoinCallback() { + // clone the node before submitting to simulate an incoming join, which is guaranteed to have a new + // disco node object serialized off the network + nodeJoinController.handleJoinRequest(cloneNode(node), new MembershipAction.JoinCallback() { @Override public void onSuccess() { logger.debug("{} completed", future); @@ -605,6 +604,14 @@ public class NodeJoinControllerTests extends ESTestCase { return future; } + /** + * creates an object clone of node, so it will be a different object instance + */ + private DiscoveryNode cloneNode(DiscoveryNode node) { + return new DiscoveryNode(node.getName(), node.getId(), node.getHostName(), node.getHostAddress(), node.getAddress(), + node.getAttributes(), node.getRoles(), node.getVersion()); + } + private void joinNode(final DiscoveryNode node) throws InterruptedException, ExecutionException { joinNodeAsync(node).get(); }