Simplify NodeJoinController to make use of new cluster state batching infra (#18832)

The NodeJoinController is responsible for processing joins from nodes, both normally and during master election. For both use cases, the class processes incoming joins in batches in order to be efficient and to accumulated enough joins (i.e., >= min_master_nodes) to seal an election and ensure the new cluster state can be committed. Since the class was written, we introduced a new infrastructure to support batch changes to the cluster state at the `ClusterService` level. This commit rewrites NodeJoinController to use that infra and be simpler.

The PR also introduces a new concept to ClusterService allowing to submit tasks in batches, guaranteeing that all tasks submitted in a batch will be processed together (potentially with more tasks).  On top of that I added some extra safety checks to the ClusterService, around potential double submission of task objects into the queue.

This is done in preparation to revive #17811
This commit is contained in:
Boaz Leskes 2016-06-17 09:22:15 +02:00 committed by GitHub
parent 600cbb6ab0
commit f256769179
8 changed files with 611 additions and 394 deletions

View File

@ -40,8 +40,10 @@ public interface ClusterStateTaskExecutor<T> {
/** /**
* Callback invoked after new cluster state is published. Note that * Callback invoked after new cluster state is published. Note that
* this method is not invoked if the cluster state was not updated. * this method is not invoked if the cluster state was not updated.
* @param clusterChangedEvent the change event for this cluster state change, containing
* both old and new states
*/ */
default void clusterStatePublished(ClusterState newClusterState) { default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
} }
/** /**
@ -92,7 +94,8 @@ public interface ClusterStateTaskExecutor<T> {
} }
private Builder<T> result(T task, TaskResult executionResult) { private Builder<T> result(T task, TaskResult executionResult) {
executionResults.put(task, executionResult); TaskResult existing = executionResults.put(task, executionResult);
assert existing == null : task + " already has result " + existing;
return this; return this;
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskConfig;
@ -312,8 +313,8 @@ public class ShardStateAction extends AbstractComponent {
} }
@Override @Override
public void clusterStatePublished(ClusterState newClusterState) { public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size(); int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size();
if (numberOfUnassignedShards > 0) { if (numberOfUnassignedShards > 0) {
String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards); String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {

View File

@ -614,5 +614,9 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
public static DiscoveryNodes readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException { public static DiscoveryNodes readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
return PROTO.readFrom(in, localNode); return PROTO.readFrom(in, localNode);
} }
public boolean isLocalNodeElectedMaster() {
return masterNodeId != null && masterNodeId.equals(localNodeId);
}
} }
} }

View File

@ -66,7 +66,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -371,34 +373,61 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
public <T> void submitStateUpdateTask(final String source, final T task, public <T> void submitStateUpdateTask(final String source, final T task,
final ClusterStateTaskConfig config, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor, final ClusterStateTaskExecutor<T> executor,
final ClusterStateTaskListener listener final ClusterStateTaskListener listener) {
) { submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
innerSubmitStateUpdateTask(source, task, config, executor, safe(listener, logger));
} }
private <T> void innerSubmitStateUpdateTask(final String source, final T task, /**
final ClusterStateTaskConfig config, * Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together,
final ClusterStateTaskExecutor executor, * potentially with more tasks of the same executor.
final SafeClusterStateTaskListener listener) { *
* @param source the source of the cluster state update task
* @param tasks a map of update tasks and their corresponding listeners
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
*/
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor) {
if (!lifecycle.started()) { if (!lifecycle.started()) {
return; return;
} }
if (tasks.isEmpty()) {
return;
}
try { try {
final UpdateTask<T> updateTask = new UpdateTask<>(source, task, config, executor, listener); // convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal
final IdentityHashMap<T, ClusterStateTaskListener> tasksIdentity = new IdentityHashMap<>(tasks);
final List<UpdateTask<T>> updateTasks = tasksIdentity.entrySet().stream().map(
entry -> new UpdateTask<>(source, entry.getKey(), config, executor, safe(entry.getValue(), logger))
).collect(Collectors.toList());
synchronized (updateTasksPerExecutor) { synchronized (updateTasksPerExecutor) {
updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()).add(updateTask); List<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>());
for (@SuppressWarnings("unchecked") UpdateTask<T> existing : existingTasks) {
if (tasksIdentity.containsKey(existing.task)) {
throw new IllegalArgumentException("task [" + existing.task + "] is already queued");
}
}
existingTasks.addAll(updateTasks);
} }
final UpdateTask<T> firstTask = updateTasks.get(0);
if (config.timeout() != null) { if (config.timeout() != null) {
updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> { updateTasksExecutor.execute(firstTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
if (updateTask.processed.getAndSet(true) == false) { for (UpdateTask<T> task : updateTasks) {
if (task.processed.getAndSet(true) == false) {
logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout()); logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout());
listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)); task.listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
}
} }
})); }));
} else { } else {
updateTasksExecutor.execute(updateTask); updateTasksExecutor.execute(firstTask);
} }
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting // ignore cases where we are shutting down..., there is really nothing interesting
@ -681,7 +710,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
} }
try { try {
executor.clusterStatePublished(newClusterState); executor.clusterStatePublished(clusterChangedEvent);
} catch (Exception e) { } catch (Exception e) {
logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, source); logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, source);
} }

View File

@ -18,9 +18,13 @@
*/ */
package org.elasticsearch.discovery.zen; package org.elasticsearch.discovery.zen;
import org.elasticsearch.ElasticsearchTimeoutException; import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -30,21 +34,22 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.membership.MembershipAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes * This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
@ -52,18 +57,17 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public class NodeJoinController extends AbstractComponent { public class NodeJoinController extends AbstractComponent {
final ClusterService clusterService; private final ClusterService clusterService;
final RoutingService routingService; private final RoutingService routingService;
final ElectMasterService electMaster; private final ElectMasterService electMaster;
final DiscoverySettings discoverySettings; private final DiscoverySettings discoverySettings;
final AtomicBoolean accumulateJoins = new AtomicBoolean(false); private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
// this is site while trying to become a master // this is set while trying to become a master
final AtomicReference<ElectionContext> electionContext = new AtomicReference<>(); // mutation should be done under lock
private ElectionContext electionContext = null;
protected final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> pendingJoinRequests = new HashMap<>();
public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) { public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) {
super(settings); super(settings);
this.clusterService = clusterService; this.clusterService = clusterService;
@ -75,7 +79,7 @@ public class NodeJoinController extends AbstractComponent {
/** /**
* waits for enough incoming joins from master eligible nodes to complete the master election * waits for enough incoming joins from master eligible nodes to complete the master election
* <p> * <p>
* You must start accumulating joins before calling this method. See {@link #startAccumulatingJoins()} * You must start accumulating joins before calling this method. See {@link #startElectionContext()}
* <p> * <p>
* The method will return once the local node has been elected as master or some failure/timeout has happened. * The method will return once the local node has been elected as master or some failure/timeout has happened.
* The exact outcome is communicated via the callback parameter, which is guaranteed to be called. * The exact outcome is communicated via the callback parameter, which is guaranteed to be called.
@ -86,29 +90,32 @@ public class NodeJoinController extends AbstractComponent {
* object * object
**/ **/
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) { public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins";
final CountDownLatch done = new CountDownLatch(1); final CountDownLatch done = new CountDownLatch(1);
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) { final ElectionCallback wrapperCallback = new ElectionCallback() {
@Override @Override
void onClose() { public void onElectedAsMaster(ClusterState state) {
if (electionContext.compareAndSet(this, null)) {
stopAccumulatingJoins("election closed");
} else {
assert false : "failed to remove current election context";
}
done.countDown(); done.countDown();
callback.onElectedAsMaster(state);
}
@Override
public void onFailure(Throwable t) {
done.countDown();
callback.onFailure(t);
} }
}; };
if (electionContext.compareAndSet(null, newContext) == false) { ElectionContext myElectionContext = null;
// should never happen, but be conservative
failContext(newContext, new IllegalStateException("double waiting for election"));
return;
}
try { try {
// check what we have so far.. // check what we have so far..
// capture the context we add the callback to make sure we fail our own
synchronized (this) {
assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
myElectionContext = electionContext;
electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
checkPendingJoinsAndElectIfNeeded(); checkPendingJoinsAndElectIfNeeded();
}
try { try {
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) { if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
@ -119,69 +126,46 @@ public class NodeJoinController extends AbstractComponent {
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
final int pendingNodes; final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();
synchronized (pendingJoinRequests) { logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);
pendingNodes = pendingJoinRequests.size();
} }
logger.trace("timed out waiting to be elected. waited [{}]. pending node joins [{}]", timeValue, pendingNodes); failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
}
// callback will clear the context, if it's active
failContext(newContext, new ElasticsearchTimeoutException("timed out waiting to be elected"));
} catch (Throwable t) { } catch (Throwable t) {
logger.error("unexpected failure while waiting for incoming joins", t); logger.error("unexpected failure while waiting for incoming joins", t);
failContext(newContext, "unexpected failure while waiting for pending joins", t); if (myElectionContext != null) {
failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + t.getMessage() + "]");
}
} }
} }
private void failContext(final ElectionContext context, final Throwable throwable) { /**
failContext(context, throwable.getMessage(), throwable); * utility method to fail the given election context under the cluster state thread
*/
private synchronized void failContextIfNeeded(final ElectionContext context, final String reason) {
if (electionContext == context) {
stopElectionContext(reason);
} }
/** utility method to fail the given election context under the cluster state thread */
private void failContext(final ElectionContext context, final String reason, final Throwable throwable) {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
context.onFailure(throwable);
return currentState;
}
@Override
public void onFailure(String source, Throwable updateFailure) {
logger.warn("unexpected error while trying to fail election context due to [{}]. original exception [{}]", updateFailure, reason, throwable);
context.onFailure(updateFailure);
}
});
} }
/** /**
* Accumulates any future incoming join request. Pending join requests will be processed in the final steps of becoming a * Accumulates any future incoming join request. Pending join requests will be processed in the final steps of becoming a
* master or when {@link #stopAccumulatingJoins(String)} is called. * master or when {@link #stopElectionContext(String)} is called.
*/ */
public void startAccumulatingJoins() { public synchronized void startElectionContext() {
logger.trace("starting to accumulate joins"); logger.trace("starting an election context, will accumulate joins");
boolean b = accumulateJoins.getAndSet(true); assert electionContext == null : "double startElectionContext() calls";
assert b == false : "double startAccumulatingJoins() calls"; electionContext = new ElectionContext();
assert electionContext.get() == null : "startAccumulatingJoins() called, but there is an ongoing election context";
} }
/** Stopped accumulating joins. All pending joins will be processed. Future joins will be processed immediately */ /**
public void stopAccumulatingJoins(String reason) { * Stopped accumulating joins. All pending joins will be processed. Future joins will be processed immediately
logger.trace("stopping join accumulation ([{}])", reason); */
assert electionContext.get() == null : "stopAccumulatingJoins() called, but there is an ongoing election context"; public void stopElectionContext(String reason) {
boolean b = accumulateJoins.getAndSet(false); logger.trace("stopping election ([{}])", reason);
assert b : "stopAccumulatingJoins() called but not accumulating"; synchronized (this) {
synchronized (pendingJoinRequests) { assert electionContext != null : "stopElectionContext() called but not accumulating";
if (pendingJoinRequests.size() > 0) { electionContext.closeAndProcessPending(reason);
processJoins("pending joins after accumulation stop [" + reason + "]"); electionContext = null;
}
} }
} }
@ -190,19 +174,14 @@ public class NodeJoinController extends AbstractComponent {
* <p> * <p>
* Note: doesn't do any validation. This should have been done before. * Note: doesn't do any validation. This should have been done before.
*/ */
public void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
synchronized (pendingJoinRequests) { if (electionContext != null) {
List<MembershipAction.JoinCallback> nodeCallbacks = pendingJoinRequests.get(node); electionContext.addIncomingJoin(node, callback);
if (nodeCallbacks == null) {
nodeCallbacks = new ArrayList<>();
pendingJoinRequests.put(node, nodeCallbacks);
}
nodeCallbacks.add(callback);
}
if (accumulateJoins.get() == false) {
processJoins("join from node[" + node + "]");
} else {
checkPendingJoinsAndElectIfNeeded(); checkPendingJoinsAndElectIfNeeded();
} else {
clusterService.submitStateUpdateTask("zen-disco-join(node " + node + "])",
node, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor, new JoinTaskListener(callback, logger));
} }
} }
@ -210,85 +189,23 @@ public class NodeJoinController extends AbstractComponent {
* checks if there is an on going request to become master and if it has enough pending joins. If so, the node will * checks if there is an on going request to become master and if it has enough pending joins. If so, the node will
* become master via a ClusterState update task. * become master via a ClusterState update task.
*/ */
private void checkPendingJoinsAndElectIfNeeded() { private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert accumulateJoins.get() : "election check requested but we are not accumulating joins"; assert electionContext != null : "election check requested but no active context";
final ElectionContext context = electionContext.get(); final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
if (context == null) { if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
return; if (logger.isTraceEnabled()) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
} }
} else {
int pendingMasterJoins = 0; if (logger.isTraceEnabled()) {
synchronized (pendingJoinRequests) { logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
for (DiscoveryNode node : pendingJoinRequests.keySet()) { electionContext.requiredMasterJoins);
if (node.isMasterNode()) { }
pendingMasterJoins++; electionContext.closeAndBecomeMaster();
electionContext = null; // clear this out so future joins won't be accumulated
} }
} }
}
if (pendingMasterJoins < context.requiredMasterJoins) {
if (context.pendingSetAsMasterTask.get() == false) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins, context.requiredMasterJoins);
}
return;
}
if (context.pendingSetAsMasterTask.getAndSet(true)) {
logger.trace("elected as master task already submitted, ignoring...");
return;
}
final String source = "zen-disco-join(elected_as_master, [" + pendingMasterJoins + "] joins received)";
clusterService.submitStateUpdateTask(source, new ProcessJoinsTask(Priority.IMMEDIATE) {
@Override
public ClusterState execute(ClusterState currentState) {
// Take into account the previous known nodes, if they happen not to be available
// then fault detection will remove these nodes.
if (currentState.nodes().getMasterNode() != null) {
// TODO can we tie break here? we don't have a remote master cluster state version to decide on
logger.trace("join thread elected local node as master, but there is already a master in place: {}", currentState.nodes().getMasterNode());
throw new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request");
}
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNode().getId());
// update the fact that we are the master...
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState, "nodes joined");
if (result.changed()) {
currentState = ClusterState.builder(currentState).routingResult(result).build();
}
// Add the incoming join requests.
// Note: we only do this now (after the reroute) to avoid assigning shards to these nodes.
return super.execute(currentState);
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public void onFailure(String source, Throwable t) {
super.onFailure(source, t);
context.onFailure(t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
super.clusterStateProcessed(source, oldState, newState);
context.onElectedAsMaster(newState);
}
});
}
/** process all pending joins */
private void processJoins(String reason) {
clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", new ProcessJoinsTask(Priority.URGENT));
}
public interface ElectionCallback { public interface ElectionCallback {
/** /**
@ -304,119 +221,143 @@ public class NodeJoinController extends AbstractComponent {
void onFailure(Throwable t); void onFailure(Throwable t);
} }
static abstract class ElectionContext implements ElectionCallback { class ElectionContext {
private final ElectionCallback callback; private ElectionCallback callback = null;
private final int requiredMasterJoins; private int requiredMasterJoins = -1;
private final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> joinRequestAccumulator = new HashMap<>();
/** set to true after enough joins have been seen and a cluster update task is submitted to become master */
final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean();
final AtomicBoolean closed = new AtomicBoolean(); final AtomicBoolean closed = new AtomicBoolean();
ElectionContext(ElectionCallback callback, int requiredMasterJoins) { public synchronized void onAttemptToBeElected(int requiredMasterJoins, ElectionCallback callback) {
this.callback = callback; ensureOpen();
assert this.requiredMasterJoins < 0;
assert this.callback == null;
this.requiredMasterJoins = requiredMasterJoins; this.requiredMasterJoins = requiredMasterJoins;
this.callback = callback;
} }
abstract void onClose(); public synchronized void addIncomingJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
ensureOpen();
joinRequestAccumulator.computeIfAbsent(node, n -> new ArrayList<>()).add(callback);
}
@Override
public void onElectedAsMaster(ClusterState state) { public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) {
assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set"; final boolean hasEnough;
if (requiredMasterJoins < 0) {
// requiredMasterNodes is unknown yet, return false and keep on waiting
hasEnough = false;
} else {
assert callback != null : "requiredMasterJoins is set but not the callback";
hasEnough = pendingMasterJoins >= requiredMasterJoins;
}
return hasEnough;
}
private Map<DiscoveryNode, ClusterStateTaskListener> getPendingAsTasks() {
Map<DiscoveryNode, ClusterStateTaskListener> tasks = new HashMap<>();
joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put(e.getKey(), new JoinTaskListener(e.getValue(), logger)));
return tasks;
}
public synchronized int getPendingMasterJoinsCount() {
int pendingMasterJoins = 0;
for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
if (node.isMasterNode()) {
pendingMasterJoins++;
}
}
return pendingMasterJoins;
}
public synchronized void closeAndBecomeMaster() {
assert callback != null : "becoming a master but the callback is not yet set";
assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master but pending joins of "
+ getPendingMasterJoinsCount() + " are not enough. needs [" + requiredMasterJoins + "];";
innerClose();
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-join(elected_as_master, [" + tasks.size() + "] nodes joined)";
tasks.put(BECOME_MASTER_TASK, joinProcessedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}
public synchronized void closeAndProcessPending(String reason) {
innerClose();
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-join(election stopped [" + reason + "] nodes joined";
tasks.put(FINISH_ELECTION_NOT_MASTER_TASK, joinProcessedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}
private void innerClose() {
if (closed.getAndSet(true)) {
throw new AlreadyClosedException("election context is already closed");
}
}
private void ensureOpen() {
if (closed.get()) {
throw new AlreadyClosedException("election context is already closed");
}
}
private synchronized ElectionCallback getCallback() {
return callback;
}
private void onElectedAsMaster(ClusterState state) {
ClusterService.assertClusterStateThread(); ClusterService.assertClusterStateThread();
assert state.nodes().isLocalNodeElectedMaster() : "onElectedAsMaster called but local node is not master"; assert state.nodes().isLocalNodeElectedMaster() : "onElectedAsMaster called but local node is not master";
if (closed.compareAndSet(false, true)) { ElectionCallback callback = getCallback(); // get under lock
try { if (callback != null) {
onClose();
} finally {
callback.onElectedAsMaster(state); callback.onElectedAsMaster(state);
} }
} }
}
@Override private void onFailure(Throwable t) {
public void onFailure(Throwable t) {
ClusterService.assertClusterStateThread(); ClusterService.assertClusterStateThread();
if (closed.compareAndSet(false, true)) { ElectionCallback callback = getCallback(); // get under lock
try { if (callback != null) {
onClose();
} finally {
callback.onFailure(t); callback.onFailure(t);
} }
} }
}
}
private final ClusterStateTaskListener joinProcessedListener = new ClusterStateTaskListener() {
/** @Override
* Processes any pending joins via a ClusterState update task. public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
* Note: this task automatically fails (and fails all pending joins) if the current node is not marked as master assert newState.nodes().isLocalNodeElectedMaster() : "should have become a master but isn't " + newState.prettyPrint();
*/ onElectedAsMaster(newState);
class ProcessJoinsTask extends ClusterStateUpdateTask {
private final List<MembershipAction.JoinCallback> joinCallbacksToRespondTo = new ArrayList<>();
private boolean nodeAdded = false;
public ProcessJoinsTask(Priority priority) {
super(priority);
} }
@Override @Override
public ClusterState execute(ClusterState currentState) { public void onFailure(String source, Throwable t) {
DiscoveryNodes.Builder nodesBuilder; ElectionContext.this.onFailure(t);
synchronized (pendingJoinRequests) { }
if (pendingJoinRequests.isEmpty()) { };
return currentState;
} }
nodesBuilder = DiscoveryNodes.builder(currentState.nodes()); static class JoinTaskListener implements ClusterStateTaskListener {
Iterator<Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>>> iterator = pendingJoinRequests.entrySet().iterator(); final List<MembershipAction.JoinCallback> callbacks;
while (iterator.hasNext()) { final private ESLogger logger;
Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>> entry = iterator.next();
final DiscoveryNode node = entry.getKey(); JoinTaskListener(MembershipAction.JoinCallback callback, ESLogger logger) {
joinCallbacksToRespondTo.addAll(entry.getValue()); this(Collections.singletonList(callback), logger);
iterator.remove();
if (currentState.nodes().nodeExists(node.getId())) {
logger.debug("received a join request for an existing node [{}]", node);
} else {
nodeAdded = true;
nodesBuilder.put(node);
for (DiscoveryNode existingNode : currentState.nodes()) {
if (node.getAddress().equals(existingNode.getAddress())) {
nodesBuilder.remove(existingNode.getId());
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
}
}
}
}
} }
// we must return a new cluster state instance to force publishing. This is important JoinTaskListener(List<MembershipAction.JoinCallback> callbacks, ESLogger logger) {
// for the joining node to finalize it's join and set us as a master this.callbacks = callbacks;
final ClusterState.Builder newState = ClusterState.builder(currentState); this.logger = logger;
if (nodeAdded) {
newState.nodes(nodesBuilder);
}
return newState.build();
} }
@Override @Override
public void onNoLongerMaster(String source) { public void onFailure(String source, Throwable t) {
// we are rejected, so drain all pending task (execute never run) for (MembershipAction.JoinCallback callback : callbacks) {
synchronized (pendingJoinRequests) {
Iterator<Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>>> iterator = pendingJoinRequests.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>> entry = iterator.next();
joinCallbacksToRespondTo.addAll(entry.getValue());
iterator.remove();
}
}
Exception e = new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request");
innerOnFailure(e);
}
void innerOnFailure(Throwable t) {
for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) {
try { try {
callback.onFailure(t); callback.onFailure(t);
} catch (Exception e) { } catch (Exception e) {
@ -425,29 +366,111 @@ public class NodeJoinController extends AbstractComponent {
} }
} }
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
innerOnFailure(t);
}
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (nodeAdded) { for (MembershipAction.JoinCallback callback : callbacks) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}
for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) {
try { try {
callback.onSuccess(); callback.onSuccess();
} catch (Exception e) { } catch (Exception e) {
logger.error("unexpected error during [{}]", e, source); logger.error("unexpected error during [{}]", e, source);
} }
} }
}
}
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(oldState, newState); // a task indicated that the current node should become master, if no current master is known
private final static DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", DummyTransportAddress.INSTANCE,
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
// a task that is used to process pending joins without explicitly becoming a master and listening to the results
// this task is used when election is stop without the local node becoming a master per se (though it might
private final static DiscoveryNode FINISH_ELECTION_NOT_MASTER_TASK = new DiscoveryNode("_NOT_MASTER_TASK_",
DummyTransportAddress.INSTANCE, Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
@Override
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
final DiscoveryNodes currentNodes = currentState.nodes();
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();
boolean nodesChanged = false;
ClusterState.Builder newState = ClusterState.builder(currentState);
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
// use these joins to try and become the master.
// Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough
nodesBuilder.masterNodeId(currentNodes.getLocalNodeId());
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
newState.blocks(clusterBlocks);
newState.nodes(nodesBuilder);
nodesChanged = true;
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
// Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster
// state is published.
// TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove.
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
nodesBuilder = DiscoveryNodes.builder(tmpState.nodes());
}
if (nodesBuilder.isLocalNodeElectedMaster() == false) {
logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
}
for (final DiscoveryNode node : joiningNodes) {
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_NOT_MASTER_TASK)) {
// noop
} else if (currentNodes.nodeExists(node.getId())) {
logger.debug("received a join request for an existing node [{}]", node);
} else {
nodesChanged = true;
nodesBuilder.put(node);
for (DiscoveryNode existingNode : currentNodes) {
if (node.getAddress().equals(existingNode.getAddress())) {
nodesBuilder.remove(existingNode.getId());
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
}
}
}
results.success(node);
}
if (nodesChanged) {
newState.nodes(nodesBuilder);
}
// we must return a new cluster state instance to force publishing. This is important
// for the joining node to finalize its join and set us as a master
return results.build(newState.build());
}
@Override
public boolean runOnlyOnMaster() {
// we validate that we are allowed to change the cluster state during cluster state processing
return false;
}
@Override
public void clusterStatePublished(ClusterChangedEvent event) {
if (event.nodesDelta().hasChanges()) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
} }
} }
} }

View File

@ -372,7 +372,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private void innerJoinCluster() { private void innerJoinCluster() {
DiscoveryNode masterNode = null; DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) { while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster(); masterNode = findMaster();
} }
@ -406,7 +406,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
); );
} else { } else {
// process any incoming joins (they will fail because we are not the master) // process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopAccumulatingJoins("not master"); nodeJoinController.stopElectionContext("not master");
// send join request // send join request
final boolean success = joinElectedMaster(masterNode); final boolean success = joinElectedMaster(masterNode);

View File

@ -20,6 +20,7 @@ package org.elasticsearch.cluster.service;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
@ -35,6 +36,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.DummyTransportAddress;
@ -51,8 +53,10 @@ import org.junit.BeforeClass;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -62,12 +66,15 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
public class ClusterServiceTests extends ESTestCase { public class ClusterServiceTests extends ESTestCase {
@ -269,7 +276,7 @@ public class ClusterServiceTests extends ESTestCase {
} }
@Override @Override
public void clusterStatePublished(ClusterState newClusterState) { public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
published.set(true); published.set(true);
latch.countDown(); latch.countDown();
} }
@ -370,10 +377,43 @@ public class ClusterServiceTests extends ESTestCase {
} }
} }
public void testSingleBatchSubmission() throws InterruptedException {
Map<Integer, ClusterStateTaskListener> tasks = new HashMap<>();
final int numOfTasks = randomInt(10);
final CountDownLatch latch = new CountDownLatch(numOfTasks);
for (int i = 0; i < numOfTasks; i++) {
tasks.put(randomInt(1024), new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail(ExceptionsHelper.detailedMessage(t));
}
});
}
clusterService.submitStateUpdateTasks("test", tasks, ClusterStateTaskConfig.build(Priority.LANGUID),
(currentState, taskList) -> {
assertThat(taskList.size(), equalTo(tasks.size()));
assertThat(taskList.stream().collect(Collectors.toSet()), equalTo(tasks.keySet()));
return ClusterStateTaskExecutor.BatchResult.<Integer>builder().successes(taskList).build(currentState);
});
latch.await();
}
public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
class Task { class Task {
private AtomicBoolean state = new AtomicBoolean(); private AtomicBoolean state = new AtomicBoolean();
private final int id;
Task(int id) {
this.id = id;
}
public void execute() { public void execute() {
if (!state.compareAndSet(false, true)) { if (!state.compareAndSet(false, true)) {
@ -382,21 +422,54 @@ public class ClusterServiceTests extends ESTestCase {
counter.incrementAndGet(); counter.incrementAndGet();
} }
} }
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Task task = (Task) o;
return id == task.id;
}
@Override
public int hashCode() {
return id;
}
@Override
public String toString() {
return Integer.toString(id);
}
} }
int numberOfThreads = randomIntBetween(2, 8); int numberOfThreads = randomIntBetween(2, 8);
int tasksSubmittedPerThread = randomIntBetween(1, 1024); int taskSubmissionsPerThread = randomIntBetween(1, 64);
int numberOfExecutors = Math.max(1, numberOfThreads / 4); int numberOfExecutors = Math.max(1, numberOfThreads / 4);
final Semaphore semaphore = new Semaphore(numberOfExecutors); final Semaphore semaphore = new Semaphore(numberOfExecutors);
class TaskExecutor implements ClusterStateTaskExecutor<Task> { class TaskExecutor implements ClusterStateTaskExecutor<Task> {
private final List<Set<Task>> taskGroups;
private AtomicInteger counter = new AtomicInteger(); private AtomicInteger counter = new AtomicInteger();
private AtomicInteger batches = new AtomicInteger(); private AtomicInteger batches = new AtomicInteger();
private AtomicInteger published = new AtomicInteger(); private AtomicInteger published = new AtomicInteger();
public TaskExecutor(List<Set<Task>> taskGroups) {
this.taskGroups = taskGroups;
}
@Override @Override
public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception { public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
tasks.forEach(task -> task.execute()); for (Set<Task> expectedSet : taskGroups) {
long count = tasks.stream().filter(expectedSet::contains).count();
assertThat("batched set should be executed together or not at all. Expected " + expectedSet + "s. Executing " + tasks,
count, anyOf(equalTo(0L), equalTo((long) expectedSet.size())));
}
tasks.forEach(Task::execute);
counter.addAndGet(tasks.size()); counter.addAndGet(tasks.size());
ClusterState maybeUpdatedClusterState = currentState; ClusterState maybeUpdatedClusterState = currentState;
if (randomBoolean()) { if (randomBoolean()) {
@ -413,59 +486,85 @@ public class ClusterServiceTests extends ESTestCase {
} }
@Override @Override
public void clusterStatePublished(ClusterState newClusterState) { public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
published.incrementAndGet(); published.incrementAndGet();
semaphore.release(); semaphore.release();
} }
} }
ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>(); ConcurrentMap<String, AtomicInteger> processedStates = new ConcurrentHashMap<>();
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
assert false;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
counters.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet();
updateLatch.countDown();
}
};
List<Set<Task>> taskGroups = new ArrayList<>();
List<TaskExecutor> executors = new ArrayList<>(); List<TaskExecutor> executors = new ArrayList<>();
for (int i = 0; i < numberOfExecutors; i++) { for (int i = 0; i < numberOfExecutors; i++) {
executors.add(new TaskExecutor()); executors.add(new TaskExecutor(taskGroups));
} }
// randomly assign tasks to executors // randomly assign tasks to executors
List<TaskExecutor> assignments = new ArrayList<>(); List<Tuple<TaskExecutor, Set<Task>>> assignments = new ArrayList<>();
int taskId = 0;
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
for (int j = 0; j < tasksSubmittedPerThread; j++) { for (int j = 0; j < taskSubmissionsPerThread; j++) {
assignments.add(randomFrom(executors)); TaskExecutor executor = randomFrom(executors);
Set<Task> tasks = new HashSet<>();
for (int t = randomInt(3); t >= 0; t--) {
tasks.add(new Task(taskId++));
}
taskGroups.add(tasks);
assignments.add(Tuple.tuple(executor, tasks));
} }
} }
Map<TaskExecutor, Integer> counts = new HashMap<>(); Map<TaskExecutor, Integer> counts = new HashMap<>();
for (TaskExecutor executor : assignments) { int totalTaskCount = 0;
counts.merge(executor, 1, (previous, one) -> previous + one); for (Tuple<TaskExecutor, Set<Task>> assignment : assignments) {
final int taskCount = assignment.v2().size();
counts.merge(assignment.v1(), taskCount, (previous, count) -> previous + count);
totalTaskCount += taskCount;
}
final CountDownLatch updateLatch = new CountDownLatch(totalTaskCount);
final ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
fail(ExceptionsHelper.detailedMessage(t));
} }
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
processedStates.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet();
updateLatch.countDown();
}
};
final ConcurrentMap<String, AtomicInteger> submittedTasksPerThread = new ConcurrentHashMap<>();
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) { for (int i = 0; i < numberOfThreads; i++) {
final int index = i; final int index = i;
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
final String threadName = Thread.currentThread().getName();
try { try {
barrier.await(); barrier.await();
for (int j = 0; j < tasksSubmittedPerThread; j++) { for (int j = 0; j < taskSubmissionsPerThread; j++) {
ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j); Tuple<TaskExecutor, Set<Task>> assignment = assignments.get(index * taskSubmissionsPerThread + j);
final Set<Task> tasks = assignment.v2();
submittedTasksPerThread.computeIfAbsent(threadName, key -> new AtomicInteger()).addAndGet(tasks.size());
final TaskExecutor executor = assignment.v1();
if (tasks.size() == 1) {
clusterService.submitStateUpdateTask( clusterService.submitStateUpdateTask(
Thread.currentThread().getName(), threadName,
new Task(), tasks.stream().findFirst().get(),
ClusterStateTaskConfig.build(randomFrom(Priority.values())), ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor, executor,
listener); listener);
} else {
Map<Task, ClusterStateTaskListener> taskListeners = new HashMap<>();
tasks.stream().forEach(t -> taskListeners.put(t, listener));
clusterService.submitStateUpdateTasks(
threadName,
taskListeners, ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor
);
}
} }
barrier.await(); barrier.await();
} catch (BrokenBarrierException | InterruptedException e) { } catch (BrokenBarrierException | InterruptedException e) {
@ -486,7 +585,7 @@ public class ClusterServiceTests extends ESTestCase {
semaphore.acquire(numberOfExecutors); semaphore.acquire(numberOfExecutors);
// assert the number of executed tasks is correct // assert the number of executed tasks is correct
assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get()); assertEquals(totalTaskCount, counter.get());
// assert each executor executed the correct number of tasks // assert each executor executed the correct number of tasks
for (TaskExecutor executor : executors) { for (TaskExecutor executor : executors) {
@ -497,8 +596,10 @@ public class ClusterServiceTests extends ESTestCase {
} }
// assert the correct number of clusterStateProcessed events were triggered // assert the correct number of clusterStateProcessed events were triggered
for (Map.Entry<String, AtomicInteger> entry : counters.entrySet()) { for (Map.Entry<String, AtomicInteger> entry : processedStates.entrySet()) {
assertEquals(entry.getValue().get(), tasksSubmittedPerThread); assertThat(submittedTasksPerThread, hasKey(entry.getKey()));
assertEquals("not all tasks submitted by " + entry.getKey() + " received a processed event",
entry.getValue().get(), submittedTasksPerThread.get(entry.getKey()).get());
} }
} }
@ -506,9 +607,6 @@ public class ClusterServiceTests extends ESTestCase {
* Note, this test can only work as long as we have a single thread executor executing the state update tasks! * Note, this test can only work as long as we have a single thread executor executing the state update tasks!
*/ */
public void testPrioritizedTasks() throws Exception { public void testPrioritizedTasks() throws Exception {
Settings settings = Settings.builder()
.put("discovery.type", "local")
.build();
BlockingTask block = new BlockingTask(Priority.IMMEDIATE); BlockingTask block = new BlockingTask(Priority.IMMEDIATE);
clusterService.submitStateUpdateTask("test", block); clusterService.submitStateUpdateTask("test", block);
int taskCount = randomIntBetween(5, 20); int taskCount = randomIntBetween(5, 20);
@ -522,7 +620,7 @@ public class ClusterServiceTests extends ESTestCase {
clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks)); clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks));
} }
block.release(); block.close();
latch.await(); latch.await();
Priority prevPriority = null; Priority prevPriority = null;
@ -535,6 +633,40 @@ public class ClusterServiceTests extends ESTestCase {
} }
} }
public void testDuplicateSubmission() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(2);
try (BlockingTask blockingTask = new BlockingTask(Priority.IMMEDIATE)) {
clusterService.submitStateUpdateTask("blocking", blockingTask);
ClusterStateTaskExecutor<SimpleTask> executor = (currentState, tasks) ->
ClusterStateTaskExecutor.BatchResult.<SimpleTask>builder().successes(tasks).build(currentState);
SimpleTask task = new SimpleTask(1);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail(ExceptionsHelper.detailedMessage(t));
}
};
clusterService.submitStateUpdateTask("first time", task, ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener);
expectThrows(IllegalArgumentException.class, () -> clusterService.submitStateUpdateTask("second time", task,
ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener));
clusterService.submitStateUpdateTask("third time a charm", new SimpleTask(1),
ClusterStateTaskConfig.build(Priority.NORMAL), executor, listener);
assertThat(latch.getCount(), equalTo(2L));
}
latch.await();
}
@TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level @TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level
public void testClusterStateUpdateLogging() throws Exception { public void testClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender(); MockLogAppender mockAppender = new MockLogAppender();
@ -740,7 +872,25 @@ public class ClusterServiceTests extends ESTestCase {
mockAppender.assertAllExpectationsMatched(); mockAppender.assertAllExpectationsMatched();
} }
private static class BlockingTask extends ClusterStateUpdateTask { private static class SimpleTask {
private final int id;
private SimpleTask(int id) {
this.id = id;
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
}
private static class BlockingTask extends ClusterStateUpdateTask implements Releasable {
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);
public BlockingTask(Priority priority) { public BlockingTask(Priority priority) {
@ -757,7 +907,7 @@ public class ClusterServiceTests extends ESTestCase {
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
} }
public void release() { public void close() {
latch.countDown(); latch.countDown();
} }

View File

@ -18,7 +18,6 @@
*/ */
package org.elasticsearch.discovery.zen; package org.elasticsearch.discovery.zen;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -122,14 +121,14 @@ public class NodeJoinControllerTests extends ESTestCase {
nodes.add(node); nodes.add(node);
joinNode(node); joinNode(node);
} }
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
ArrayList<Future<Void>> pendingJoins = new ArrayList<>(); ArrayList<Future<Void>> pendingJoins = new ArrayList<>();
for (int i = randomInt(5); i > 0; i--) { for (int i = randomInt(5); i > 0; i--) {
DiscoveryNode node = newNode(nodeId++); DiscoveryNode node = newNode(nodeId++);
nodes.add(node); nodes.add(node);
pendingJoins.add(joinNodeAsync(node)); pendingJoins.add(joinNodeAsync(node));
} }
nodeJoinController.stopAccumulatingJoins("test"); nodeJoinController.stopElectionContext("test");
boolean hadSyncJoin = false; boolean hadSyncJoin = false;
for (int i = randomInt(5); i > 0; i--) { for (int i = randomInt(5); i > 0; i--) {
DiscoveryNode node = newNode(nodeId++); DiscoveryNode node = newNode(nodeId++);
@ -145,8 +144,6 @@ public class NodeJoinControllerTests extends ESTestCase {
for (Future<Void> joinFuture : pendingJoins) { for (Future<Void> joinFuture : pendingJoins) {
joinFuture.get(); joinFuture.get();
} }
assertNodesInCurrentState(nodes);
} }
public void testFailingJoinsWhenNotMaster() throws ExecutionException, InterruptedException { public void testFailingJoinsWhenNotMaster() throws ExecutionException, InterruptedException {
@ -163,14 +160,14 @@ public class NodeJoinControllerTests extends ESTestCase {
logger.debug("--> testing joins fail post accumulation"); logger.debug("--> testing joins fail post accumulation");
ArrayList<Future<Void>> pendingJoins = new ArrayList<>(); ArrayList<Future<Void>> pendingJoins = new ArrayList<>();
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
for (int i = 1 + randomInt(5); i > 0; i--) { for (int i = 1 + randomInt(5); i > 0; i--) {
DiscoveryNode node = newNode(nodeId++); DiscoveryNode node = newNode(nodeId++);
final Future<Void> future = joinNodeAsync(node); final Future<Void> future = joinNodeAsync(node);
pendingJoins.add(future); pendingJoins.add(future);
assertThat(future.isDone(), equalTo(false)); assertThat(future.isDone(), equalTo(false));
} }
nodeJoinController.stopAccumulatingJoins("test"); nodeJoinController.stopElectionContext("test");
for (Future<Void> future : pendingJoins) { for (Future<Void> future : pendingJoins) {
try { try {
future.get(); future.get();
@ -197,7 +194,7 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
} }
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
final SimpleFuture electionFuture = new SimpleFuture("master election"); final SimpleFuture electionFuture = new SimpleFuture("master election");
final Thread masterElection = new Thread(new AbstractRunnable() { final Thread masterElection = new Thread(new AbstractRunnable() {
@Override @Override
@ -245,7 +242,7 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
} }
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
final SimpleFuture electionFuture = new SimpleFuture("master election"); final SimpleFuture electionFuture = new SimpleFuture("master election");
final Thread masterElection = new Thread(new AbstractRunnable() { final Thread masterElection = new Thread(new AbstractRunnable() {
@Override @Override
@ -334,8 +331,8 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
logger.debug("--> testing accumulation stopped"); logger.debug("--> testing accumulation stopped");
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
nodeJoinController.stopAccumulatingJoins("test"); nodeJoinController.stopElectionContext("test");
} }
@ -356,7 +353,7 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
} }
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
final int initialJoins = randomIntBetween(0, requiredJoins - 1); final int initialJoins = randomIntBetween(0, requiredJoins - 1);
final ArrayList<SimpleFuture> pendingJoins = new ArrayList<>(); final ArrayList<SimpleFuture> pendingJoins = new ArrayList<>();
ArrayList<DiscoveryNode> nodesToJoin = new ArrayList<>(); ArrayList<DiscoveryNode> nodesToJoin = new ArrayList<>();
@ -389,7 +386,7 @@ public class NodeJoinControllerTests extends ESTestCase {
}); });
latch.await(); latch.await();
logger.debug("--> verifying election timed out"); logger.debug("--> verifying election timed out");
assertThat(failure.get(), instanceOf(ElasticsearchTimeoutException.class)); assertThat(failure.get(), instanceOf(NotMasterException.class));
logger.debug("--> verifying all joins are failed"); logger.debug("--> verifying all joins are failed");
for (SimpleFuture future : pendingJoins) { for (SimpleFuture future : pendingJoins) {
@ -457,7 +454,7 @@ public class NodeJoinControllerTests extends ESTestCase {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null); DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null);
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder)); setState(clusterService, ClusterState.builder(clusterService.state()).nodes(nodesBuilder));
nodeJoinController.startAccumulatingJoins(); nodeJoinController.startElectionContext();
Thread[] threads = new Thread[3 + randomInt(5)]; Thread[] threads = new Thread[3 + randomInt(5)];
final int requiredJoins = randomInt(threads.length); final int requiredJoins = randomInt(threads.length);
@ -589,7 +586,9 @@ public class NodeJoinControllerTests extends ESTestCase {
private SimpleFuture joinNodeAsync(final DiscoveryNode node) throws InterruptedException { private SimpleFuture joinNodeAsync(final DiscoveryNode node) throws InterruptedException {
final SimpleFuture future = new SimpleFuture("join of " + node + " (id [" + joinId.incrementAndGet() + "]"); final SimpleFuture future = new SimpleFuture("join of " + node + " (id [" + joinId.incrementAndGet() + "]");
logger.debug("starting {}", future); logger.debug("starting {}", future);
nodeJoinController.handleJoinRequest(node, new MembershipAction.JoinCallback() { // clone the node before submitting to simulate an incoming join, which is guaranteed to have a new
// disco node object serialized off the network
nodeJoinController.handleJoinRequest(cloneNode(node), new MembershipAction.JoinCallback() {
@Override @Override
public void onSuccess() { public void onSuccess() {
logger.debug("{} completed", future); logger.debug("{} completed", future);
@ -605,6 +604,14 @@ public class NodeJoinControllerTests extends ESTestCase {
return future; return future;
} }
/**
* creates an object clone of node, so it will be a different object instance
*/
private DiscoveryNode cloneNode(DiscoveryNode node) {
return new DiscoveryNode(node.getName(), node.getId(), node.getHostName(), node.getHostAddress(), node.getAddress(),
node.getAttributes(), node.getRoles(), node.getVersion());
}
private void joinNode(final DiscoveryNode node) throws InterruptedException, ExecutionException { private void joinNode(final DiscoveryNode node) throws InterruptedException, ExecutionException {
joinNodeAsync(node).get(); joinNodeAsync(node).get();
} }