Decouple the TransportService and ClusterService #16872

Currently, the cluster service is tightly coupled to the transport service by both managing node connections and requiring the bound address in order to create the local disco node. This commit introduces a new NodeConnectionsService which is in charge of node connection management and makes it possible to remove all network related calls from the cluster service. The local DiscoNode is now created by DiscoveryNodeService and is set both the cluster service and the transport service during node start up.

Closes #16788
Closes #16872
This commit is contained in:
Boaz Leskes 2016-02-27 18:48:42 +01:00
parent b4db26eaf9
commit cd12241e9f
22 changed files with 1423 additions and 963 deletions

View File

@ -136,6 +136,7 @@ public class ClusterModule extends AbstractModule {
bind(AllocationService.class).asEagerSingleton();
bind(DiscoveryNodeService.class).asEagerSingleton();
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
bind(NodeConnectionsService.class).asEagerSingleton();
bind(OperationRouting.class).asEagerSingleton();
bind(MetaDataCreateIndexService.class).asEagerSingleton();
bind(MetaDataDeleteIndexService.class).asEagerSingleton();

View File

@ -26,7 +26,6 @@ import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskManager;
import java.util.List;
@ -154,9 +153,4 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue
*/
TimeValue getMaxTaskWaitTime();
/**
* Returns task manager created in the cluster service
*/
TaskManager getTaskManager();
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
/**
* This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are
* removed. Also, it periodically checks that all connections are still open and if needed restores them.
* Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond
* to pings. This is done by {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection}. Master fault detection
* is done by {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection}.
*/
public class NodeConnectionsService extends AbstractLifecycleComponent<NodeConnectionsService> {
public static final Setting<TimeValue> CLUSTER_NODE_RECONNECT_INTERVAL_SETTING =
Setting.positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER);
private final ThreadPool threadPool;
private final TransportService transportService;
// map between current node and the number of failed connection attempts. 0 means successfully connected.
// if a node doesn't appear in this list it shouldn't be monitored
private ConcurrentMap<DiscoveryNode, Integer> nodes = ConcurrentCollections.newConcurrentMap();
final private KeyedLock<DiscoveryNode> nodeLocks = new KeyedLock<>();
private final TimeValue reconnectInterval;
private volatile ScheduledFuture<?> backgroundFuture = null;
@Inject
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
}
public void connectToAddedNodes(ClusterChangedEvent event) {
// TODO: do this in parallel (and wait)
for (final DiscoveryNode node : event.nodesDelta().addedNodes()) {
try (Releasable ignored = nodeLocks.acquire(node)) {
Integer current = nodes.put(node, 0);
assert current == null : "node " + node + " was added in event but already in internal nodes";
validateNodeConnected(node);
}
}
}
public void disconnectFromRemovedNodes(ClusterChangedEvent event) {
for (final DiscoveryNode node : event.nodesDelta().removedNodes()) {
try (Releasable ignored = nodeLocks.acquire(node)) {
Integer current = nodes.remove(node);
assert current != null : "node " + node + " was removed in event but not in internal nodes";
try {
transportService.disconnectFromNode(node);
} catch (Throwable e) {
logger.warn("failed to disconnect to node [" + node + "]", e);
}
}
}
}
void validateNodeConnected(DiscoveryNode node) {
assert nodeLocks.isHeldByCurrentThread(node) : "validateNodeConnected must be called under lock";
if (lifecycle.stoppedOrClosed() ||
nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time...
// nothing to do
} else {
try {
// connecting to an already connected node is a noop
transportService.connectToNode(node);
nodes.put(node, 0);
} catch (Exception e) {
Integer nodeFailureCount = nodes.get(node);
assert nodeFailureCount != null : node + " didn't have a counter in nodes map";
nodeFailureCount = nodeFailureCount + 1;
// log every 6th failure
if ((nodeFailureCount % 6) == 1) {
logger.warn("failed to connect to node {} (tried [{}] times)", e, node, nodeFailureCount);
}
nodes.put(node, nodeFailureCount);
}
}
}
class ConnectionChecker extends AbstractRunnable {
@Override
public void onFailure(Throwable t) {
logger.warn("unexpected error while checking for node reconnects", t);
}
protected void doRun() {
for (DiscoveryNode node : nodes.keySet()) {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateNodeConnected(node);
}
}
}
@Override
public void onAfter() {
if (lifecycle.started()) {
backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this);
}
}
}
@Override
protected void doStart() {
backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker());
}
@Override
protected void doStop() {
FutureUtils.cancel(backgroundFuture);
}
@Override
protected void doClose() {
}
}

View File

@ -19,24 +19,40 @@
package org.elasticsearch.cluster.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*/
public class DiscoveryNodeService extends AbstractComponent {
public static final Setting<Long> NODE_ID_SEED_SETTING =
// don't use node.id.seed so it won't be seen as an attribute
Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, false, Setting.Scope.CLUSTER);
private final List<CustomAttributesProvider> customAttributesProviders = new CopyOnWriteArrayList<>();
private final Version version;
@Inject
public DiscoveryNodeService(Settings settings) {
public DiscoveryNodeService(Settings settings, Version version) {
super(settings);
this.version = version;
}
public static String generateNodeId(Settings settings) {
Random random = Randomness.get(settings, NODE_ID_SEED_SETTING);
return Strings.randomBase64UUID(random);
}
public DiscoveryNodeService addCustomAttributeProvider(CustomAttributesProvider customAttributesProvider) {
@ -44,7 +60,7 @@ public class DiscoveryNodeService extends AbstractComponent {
return this;
}
public Map<String, String> buildAttributes() {
public DiscoveryNode buildLocalNode(TransportAddress publishAddress) {
Map<String, String> attributes = new HashMap<>(settings.getByPrefix("node.").getAsMap());
attributes.remove("name"); // name is extracted in other places
if (attributes.containsKey("client")) {
@ -76,10 +92,11 @@ public class DiscoveryNodeService extends AbstractComponent {
}
}
return attributes;
final String nodeId = generateNodeId(settings);
return new DiscoveryNode(settings.get("node.name"), nodeId, publishAddress, attributes, version);
}
public static interface CustomAttributesProvider {
public interface CustomAttributesProvider {
Map<String, String> buildAttributes();
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.service;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
@ -32,19 +31,18 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -54,7 +52,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
@ -65,9 +62,7 @@ import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
@ -78,8 +73,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@ -97,25 +90,15 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
public static final Setting<TimeValue> CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds(30), true, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.service.reconnect_interval", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER);
public static final String UPDATE_THREAD_NAME = "clusterService#updateTask";
public static final Setting<Long> NODE_ID_SEED_SETTING =
// don't use node.id.seed so it won't be seen as an attribute
Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, false, Setting.Scope.CLUSTER);
private final ThreadPool threadPool;
private BiConsumer<ClusterChangedEvent, Discovery.AckListener> clusterStatePublisher;
private final OperationRouting operationRouting;
private final TransportService transportService;
private final ClusterSettings clusterSettings;
private final DiscoveryNodeService discoveryNodeService;
private final Version version;
private final TimeValue reconnectInterval;
private TimeValue slowTaskLoggingThreshold;
@ -140,47 +123,49 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final ClusterBlocks.Builder initialBlocks;
private final TaskManager taskManager;
private volatile ScheduledFuture reconnectToNodes;
private NodeConnectionsService nodeConnectionsService;
@Inject
public InternalClusterService(Settings settings, OperationRouting operationRouting, TransportService transportService,
ClusterSettings clusterSettings, ThreadPool threadPool, ClusterName clusterName, DiscoveryNodeService discoveryNodeService, Version version) {
public InternalClusterService(Settings settings, OperationRouting operationRouting,
ClusterSettings clusterSettings, ThreadPool threadPool, ClusterName clusterName) {
super(settings);
this.operationRouting = operationRouting;
this.transportService = transportService;
this.threadPool = threadPool;
this.clusterSettings = clusterSettings;
this.discoveryNodeService = discoveryNodeService;
this.version = version;
// will be replaced on doStart.
this.clusterState = ClusterState.builder(clusterName).build();
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, this::setSlowTaskLoggingThreshold);
this.reconnectInterval = CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING.get(settings);
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
initialBlocks = ClusterBlocks.builder();
taskManager = transportService.getTaskManager();
}
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
this.slowTaskLoggingThreshold = slowTaskLoggingThreshold;
}
public void setClusterStatePublisher(BiConsumer<ClusterChangedEvent, Discovery.AckListener> publisher) {
synchronized public void setClusterStatePublisher(BiConsumer<ClusterChangedEvent, Discovery.AckListener> publisher) {
clusterStatePublisher = publisher;
}
synchronized public void setLocalNode(DiscoveryNode localNode) {
assert clusterState.nodes().localNodeId() == null : "local node is already set";
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).put(localNode).localNodeId(localNode.id());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
}
synchronized public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
assert this.nodeConnectionsService == null : "nodeConnectionsService is already set";
this.nodeConnectionsService = nodeConnectionsService;
}
@Override
public void addInitialStateBlock(ClusterBlock block) throws IllegalStateException {
synchronized public void addInitialStateBlock(ClusterBlock block) throws IllegalStateException {
if (lifecycle.started()) {
throw new IllegalStateException("can't set initial block when started");
}
@ -188,12 +173,12 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
@Override
public void removeInitialStateBlock(ClusterBlock block) throws IllegalStateException {
synchronized public void removeInitialStateBlock(ClusterBlock block) throws IllegalStateException {
removeInitialStateBlock(block.id());
}
@Override
public void removeInitialStateBlock(int blockId) throws IllegalStateException {
synchronized public void removeInitialStateBlock(int blockId) throws IllegalStateException {
if (lifecycle.started()) {
throw new IllegalStateException("can't set initial block when started");
}
@ -201,26 +186,18 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
@Override
protected void doStart() {
synchronized protected void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterState.nodes().localNode(), "please set the local node before starting");
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
add(localNodeMasterListeners);
add(taskManager);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext());
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
final String nodeId = generateNodeId(settings);
final TransportAddress publishAddress = transportService.boundAddress().publishAddress();
DiscoveryNode localNode = new DiscoveryNode(settings.get("node.name"), nodeId, publishAddress, nodeAttributes, version);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).blocks(initialBlocks).build();
this.transportService.setLocalNode(localNode);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
}
@Override
protected void doStop() {
FutureUtils.cancel(this.reconnectToNodes);
synchronized protected void doStop() {
for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
onGoingTimeout.cancel();
onGoingTimeout.listener.onClose();
@ -230,7 +207,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
@Override
protected void doClose() {
synchronized protected void doClose() {
}
@Override
@ -400,11 +377,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return updateTasksExecutor.getMaxTaskWaitTime();
}
@Override
public TaskManager getTaskManager() {
return taskManager;
}
/** asserts that the current thread is the cluster state update thread */
public boolean assertClusterStateThread() {
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
@ -457,14 +429,14 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return;
}
ClusterStateTaskExecutor.BatchResult<T> batchResult;
long startTimeNS = System.nanoTime();
long startTimeNS = currentTimeInNanos();
try {
List<T> inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
batchResult = executor.execute(previousClusterState, inputs);
} catch (Throwable e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
StringBuilder sb = new StringBuilder("failed to execute cluster state update in [").append(executionTime).append("], state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(previousClusterState.nodes().prettyPrint());
sb.append(previousClusterState.routingTable().prettyPrint());
sb.append(previousClusterState.getRoutingNodes().prettyPrint());
@ -509,8 +481,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster_state", source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, source);
return;
}
@ -568,15 +540,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
// TODO, do this in parallel (and wait)
for (DiscoveryNode node : nodesDelta.addedNodes()) {
try {
transportService.connectToNode(node);
} catch (Throwable e) {
// the fault detection will detect it as failed as well
logger.warn("failed to connect to node [" + node + "]", e);
}
}
nodeConnectionsService.connectToAddedNodes(clusterChangedEvent);
// if we are the master, publish the new state to all nodes
// we publish here before we send a notification to all the listeners, since if it fails
@ -612,13 +576,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
for (DiscoveryNode node : nodesDelta.removedNodes()) {
try {
transportService.disconnectFromNode(node);
} catch (Throwable e) {
logger.warn("failed to disconnect to node [" + node + "]", e);
}
}
nodeConnectionsService.disconnectFromRemovedNodes(clusterChangedEvent);
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
@ -649,11 +607,11 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, source);
}
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, source);
} catch (Throwable t) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.stateUUID()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
@ -664,6 +622,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
// this one is overridden in tests so we can control time
protected long currentTimeInNanos() {return System.nanoTime();}
private static SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, ESLogger logger) {
if (listener instanceof AckedClusterStateTaskListener) {
return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener) listener, logger);
@ -777,7 +738,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
logger.warn("cluster state update task [{}] took {} above the warn threshold of {}", source, executionTime, slowTaskLoggingThreshold);
logger.warn("cluster state update task [{}] took [{}] above the warn threshold of {}", source, executionTime, slowTaskLoggingThreshold);
}
}
@ -809,64 +770,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
private class ReconnectToNodes implements Runnable {
private ConcurrentMap<DiscoveryNode, Integer> failureCount = ConcurrentCollections.newConcurrentMap();
@Override
public void run() {
// master node will check against all nodes if its alive with certain discoveries implementations,
// but we can't rely on that, so we check on it as well
for (DiscoveryNode node : clusterState.nodes()) {
if (lifecycle.stoppedOrClosed()) {
return;
}
if (clusterState.nodes().nodeExists(node.id())) { // we double check existence of node since connectToNode might take time...
if (!transportService.nodeConnected(node)) {
try {
transportService.connectToNode(node);
} catch (Exception e) {
if (lifecycle.stoppedOrClosed()) {
return;
}
if (clusterState.nodes().nodeExists(node.id())) { // double check here as well, maybe its gone?
Integer nodeFailureCount = failureCount.get(node);
if (nodeFailureCount == null) {
nodeFailureCount = 1;
} else {
nodeFailureCount = nodeFailureCount + 1;
}
// log every 6th failure
if ((nodeFailureCount % 6) == 0) {
// reset the failure count...
nodeFailureCount = 0;
logger.warn("failed to reconnect to node {}", e, node);
}
failureCount.put(node, nodeFailureCount);
}
}
}
}
}
// go over and remove failed nodes that have been removed
DiscoveryNodes nodes = clusterState.nodes();
for (Iterator<DiscoveryNode> failedNodesIt = failureCount.keySet().iterator(); failedNodesIt.hasNext(); ) {
DiscoveryNode failedNode = failedNodesIt.next();
if (!nodes.nodeExists(failedNode.id())) {
failedNodesIt.remove();
}
}
if (lifecycle.started()) {
reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this);
}
}
}
public static String generateNodeId(Settings settings) {
Random random = Randomness.get(settings, NODE_ID_SEED_SETTING);
return Strings.randomBase64UUID(random);
}
private static class LocalNodeMasterListeners implements ClusterStateListener {
private final List<LocalNodeMasterListener> listeners = new CopyOnWriteArrayList<>();

View File

@ -29,8 +29,10 @@ import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
@ -259,7 +261,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
InternalClusterService.CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING,
NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
Transport.TRANSPORT_TCP_COMPRESS,
@ -326,7 +328,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
Environment.PATH_SCRIPTS_SETTING,
Environment.PATH_SHARED_DATA_SETTING,
Environment.PIDFILE_SETTING,
InternalClusterService.NODE_ID_SEED_SETTING,
DiscoveryNodeService.NODE_ID_SEED_SETTING,
DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING,
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.ZEN_MASTER_SERVICE_TYPE_SETTING,

View File

@ -20,7 +20,10 @@
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.lease.Releasable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@ -30,7 +33,6 @@ import java.util.concurrent.locks.ReentrantLock;
* lock. The latter is important to assure that the list of locks does not grow
* infinitely.
*
* A Thread can acquire a lock only once.
*
* */
public class KeyedLock<T> {
@ -50,48 +52,38 @@ public class KeyedLock<T> {
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMap();
protected final ThreadLocal<KeyLock> threadLocal = new ThreadLocal<>();
public void acquire(T key) {
public Releasable acquire(T key) {
assert isHeldByCurrentThread(key) == false : "lock for " + key + " is already heald by this thread";
while (true) {
if (threadLocal.get() != null) {
// if we are here, the thread already has the lock
throw new IllegalStateException("Lock already acquired in Thread" + Thread.currentThread().getId()
+ " for key " + key);
}
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
KeyLock newLock = new KeyLock(fair);
perNodeLock = map.putIfAbsent(key, newLock);
if (perNodeLock == null) {
newLock.lock();
threadLocal.set(newLock);
return;
return new ReleasableLock(key, newLock);
}
}
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
perNodeLock.lock();
threadLocal.set(perNodeLock);
return;
return new ReleasableLock(key, perNodeLock);
}
}
}
public void release(T key) {
KeyLock lock = threadLocal.get();
public boolean isHeldByCurrentThread(T key) {
KeyLock lock = map.get(key);
if (lock == null) {
throw new IllegalStateException("Lock not acquired");
return false;
}
release(key, lock);
return lock.isHeldByCurrentThread();
}
void release(T key, KeyLock lock) {
assert lock.isHeldByCurrentThread();
assert lock == map.get(key);
lock.unlock();
threadLocal.set(null);
int decrementAndGet = lock.count.decrementAndGet();
if (decrementAndGet == 0) {
map.remove(key, lock);
@ -99,6 +91,24 @@ public class KeyedLock<T> {
}
private final class ReleasableLock implements Releasable {
final T key;
final KeyLock lock;
final AtomicBoolean closed = new AtomicBoolean();
private ReleasableLock(T key, KeyLock lock) {
this.key = key;
this.lock = lock;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
release(key, lock);
}
}
}
@SuppressWarnings("serial")
private final static class KeyLock extends ReentrantLock {
KeyLock(boolean fair) {

View File

@ -34,8 +34,10 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.StopWatch;
@ -294,6 +296,10 @@ public class Node implements Closeable {
"node cluster service implementation must inherit from InternalClusterService";
final InternalClusterService clusterService = (InternalClusterService) injector.getInstance(ClusterService.class);
final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
// TODO hack around circular dependencies problems
injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class));
@ -311,6 +317,15 @@ public class Node implements Closeable {
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
DiscoveryNode localNode = injector.getInstance(DiscoveryNodeService.class)
.buildLocalNode(transportService.boundAddress().publishAddress());
// TODO: need to find a cleaner way to start/construct a service with some initial parameters,
// playing nice with the life cycle interfaces
clusterService.setLocalNode(localNode);
transportService.setLocalNode(localNode);
clusterService.add(transportService.getTaskManager());
clusterService.start();
// start after cluster service so the local disco is known
@ -392,6 +407,7 @@ public class Node implements Closeable {
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
injector.getInstance(Discovery.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(MonitorService.class).stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
@ -449,6 +465,8 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(RoutingService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));
toClose.add(injector.getInstance(NodeConnectionsService.class));
toClose.add(() -> stopWatch.stop().start("discovery"));
toClose.add(injector.getInstance(Discovery.class));
toClose.add(() -> stopWatch.stop().start("monitor"));

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.metrics.CounterMetric;
@ -943,8 +944,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
globalLock.readLock().lock();
try {
connectionLock.acquire(node.id());
try {
try (Releasable ignored = connectionLock.acquire(node.id())) {
if (!lifecycle.started()) {
throw new IllegalStateException("can't add nodes to a stopped transport");
}
@ -979,8 +980,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
}
} finally {
connectionLock.release(node.id());
}
} finally {
globalLock.readLock().unlock();
@ -1103,8 +1102,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override
public void disconnectFromNode(DiscoveryNode node) {
connectionLock.acquire(node.id());
try {
try (Releasable ignored = connectionLock.acquire(node.id())) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
try {
@ -1115,8 +1114,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
transportServiceAdapter.raiseNodeDisconnected(node);
}
}
} finally {
connectionLock.release(node.id());
}
}
@ -1128,8 +1125,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// check outside of the lock
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id());
try {
try (Releasable ignored = connectionLock.acquire(node.id())) {
nodeChannels = connectedNodes.get(node);
// check again within the connection lock, if its still applicable to remove it
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
@ -1143,8 +1139,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
return true;
}
} finally {
connectionLock.release(node.id());
}
}
return false;

View File

@ -194,7 +194,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
}
};
transportService.start();
clusterService = new TestClusterService(threadPool, transportService);
clusterService = new TestClusterService(threadPool);
clusterService.add(transportService.getTaskManager());
discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), Version.CURRENT);
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
@ -238,7 +238,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
RecordingTaskManagerListener[] listeners = new RecordingTaskManagerListener[nodes.length];
for (int i = 0; i < nodes.length; i++) {
listeners[i] = new RecordingTaskManagerListener(nodes[i].discoveryNode, actionMasks);
((MockTaskManager) (nodes[i].clusterService.getTaskManager())).addListener(listeners[i]);
((MockTaskManager) (nodes[i].transportService.getTaskManager())).addListener(listeners[i]);
}
return listeners;
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.test.tasks.MockTaskManagerListener;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
@ -263,8 +264,8 @@ public class TasksIT extends ESIntegTestCase {
ReentrantLock taskFinishLock = new ReentrantLock();
taskFinishLock.lock();
CountDownLatch taskRegistered = new CountDownLatch(1);
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
((MockTaskManager)clusterService.getTaskManager()).addListener(new MockTaskManagerListener() {
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
@Override
public void onTaskRegistered(Task task) {
if (task.getAction().startsWith(IndexAction.NAME)) {
@ -408,7 +409,7 @@ public class TasksIT extends ESIntegTestCase {
@Override
public void tearDown() throws Exception {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
((MockTaskManager)internalCluster().getInstance(ClusterService.class, entry.getKey().v1()).getTaskManager()).removeListener(entry.getValue());
((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener(entry.getValue());
}
listeners.clear();
super.tearDown();
@ -418,10 +419,10 @@ public class TasksIT extends ESIntegTestCase {
* Registers recording task event listeners with the given action mask on all nodes
*/
private void registerTaskManageListeners(String actionMasks) {
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
DiscoveryNode node = clusterService.localNode();
for (String nodeName : internalCluster().getNodeNames()) {
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node, Strings.splitStringToArray(actionMasks, ','));
((MockTaskManager)clusterService.getTaskManager()).addListener(listener);
((MockTaskManager) internalCluster().getInstance(TransportService.class, nodeName).getTaskManager()).addListener(listener);
RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.name(), actionMasks), listener);
assertNull(oldListener);
}

View File

@ -18,16 +18,12 @@
*/
package org.elasticsearch.cluster;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -35,38 +31,24 @@ import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -85,74 +67,6 @@ public class ClusterServiceIT extends ESIntegTestCase {
return pluginList(TestPlugin.class);
}
public void testTimeoutUpdateTask() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class);
final CountDownLatch block = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
throw new RuntimeException(t);
}
});
final CountDownLatch timedOut = new CountDownLatch(1);
final AtomicBoolean executeCalled = new AtomicBoolean();
clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return TimeValue.timeValueMillis(2);
}
@Override
public void onFailure(String source, Throwable t) {
timedOut.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
executeCalled.set(true);
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
timedOut.await();
block.countDown();
final CountDownLatch allProcessed = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
throw new RuntimeException(t);
}
@Override
public ClusterState execute(ClusterState currentState) {
allProcessed.countDown();
return currentState;
}
});
allProcessed.await(); // executed another task to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));
}
public void testAckedUpdateTask() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
@ -299,63 +213,6 @@ public class ClusterServiceIT extends ESIntegTestCase {
assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
}
public void testMasterAwareExecution() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
InternalTestCluster.Async<String> master = internalCluster().startNodeAsync(settings);
InternalTestCluster.Async<String> nonMaster = internalCluster().startNodeAsync(settingsBuilder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).build());
master.get();
ensureGreen(); // make sure we have a cluster
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nonMaster.get());
final boolean[] taskFailed = {false};
final CountDownLatch latch1 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch1.countDown();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
taskFailed[0] = true;
latch1.countDown();
}
});
latch1.await();
assertTrue("cluster state update task was executed on a non-master", taskFailed[0]);
taskFailed[0] = true;
final CountDownLatch latch2 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
taskFailed[0] = false;
latch2.countDown();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
taskFailed[0] = true;
latch2.countDown();
}
});
latch2.await();
assertFalse("non-master cluster state update task was not executed", taskFailed[0]);
}
public void testAckedUpdateTaskNoAckExpected() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
@ -715,571 +572,6 @@ public class ClusterServiceIT extends ESIntegTestCase {
}
}
/**
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
*/
public void testPrioritizedTasks() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
BlockingTask block = new BlockingTask(Priority.IMMEDIATE);
clusterService.submitStateUpdateTask("test", block);
int taskCount = randomIntBetween(5, 20);
Priority[] priorities = Priority.values();
// will hold all the tasks in the order in which they were executed
List<PrioritizedTask> tasks = new ArrayList<>(taskCount);
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
Priority priority = priorities[randomIntBetween(0, priorities.length - 1)];
clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks));
}
block.release();
latch.await();
Priority prevPriority = null;
for (PrioritizedTask task : tasks) {
if (prevPriority == null) {
prevPriority = task.priority();
} else {
assertThat(task.priority().sameOrAfter(prevPriority), is(true));
}
}
}
/*
* test that a listener throwing an exception while handling a
* notification does not prevent publication notification to the
* executor
*/
public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws InterruptedException {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
final CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean published = new AtomicBoolean();
clusterService.submitStateUpdateTask(
"testClusterStateTaskListenerThrowingExceptionIsOkay",
new Object(),
ClusterStateTaskConfig.build(Priority.NORMAL),
new ClusterStateTaskExecutor<Object>() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public BatchResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return BatchResult.builder().successes(tasks).build(newClusterState);
}
@Override
public void clusterStatePublished(ClusterState newClusterState) {
published.set(true);
latch.countDown();
}
},
new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
throw new IllegalStateException(source);
}
@Override
public void onFailure(String source, Throwable t) {
}
}
);
latch.await();
assertTrue(published.get());
}
// test that for a single thread, tasks are executed in the order
// that they are submitted
public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
class TaskExecutor implements ClusterStateTaskExecutor<Integer> {
List<Integer> tasks = new ArrayList<>();
@Override
public BatchResult<Integer> execute(ClusterState currentState, List<Integer> tasks) throws Exception {
this.tasks.addAll(tasks);
return BatchResult.<Integer>builder().successes(tasks).build(ClusterState.builder(currentState).build());
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
}
int numberOfThreads = randomIntBetween(2, 8);
TaskExecutor[] executors = new TaskExecutor[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
executors[i] = new TaskExecutor();
}
int tasksSubmittedPerThread = randomIntBetween(2, 1024);
CopyOnWriteArrayList<Tuple<String, Throwable>> failures = new CopyOnWriteArrayList<>();
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure: [{}]", t, source);
failures.add(new Tuple<>(source, t));
updateLatch.countDown();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
updateLatch.countDown();
}
};
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
barrier.await();
for (int j = 0; j < tasksSubmittedPerThread; j++) {
clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener);
}
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
});
thread.start();
}
// wait for all threads to be ready
barrier.await();
// wait for all threads to finish
barrier.await();
updateLatch.await();
assertThat(failures, empty());
for (int i = 0; i < numberOfThreads; i++) {
assertEquals(tasksSubmittedPerThread, executors[i].tasks.size());
for (int j = 0; j < tasksSubmittedPerThread; j++) {
assertNotNull(executors[i].tasks.get(j));
assertEquals("cluster state update task executed out of order", j, (int)executors[i].tasks.get(j));
}
}
}
public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
AtomicInteger counter = new AtomicInteger();
class Task {
private AtomicBoolean state = new AtomicBoolean();
public void execute() {
if (!state.compareAndSet(false, true)) {
throw new IllegalStateException();
} else {
counter.incrementAndGet();
}
}
}
int numberOfThreads = randomIntBetween(2, 8);
int tasksSubmittedPerThread = randomIntBetween(1, 1024);
int numberOfExecutors = Math.max(1, numberOfThreads / 4);
final Semaphore semaphore = new Semaphore(numberOfExecutors);
class TaskExecutor implements ClusterStateTaskExecutor<Task> {
private AtomicInteger counter = new AtomicInteger();
private AtomicInteger batches = new AtomicInteger();
private AtomicInteger published = new AtomicInteger();
@Override
public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
tasks.forEach(task -> task.execute());
counter.addAndGet(tasks.size());
ClusterState maybeUpdatedClusterState = currentState;
if (randomBoolean()) {
maybeUpdatedClusterState = ClusterState.builder(currentState).build();
batches.incrementAndGet();
semaphore.acquire();
}
return BatchResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public void clusterStatePublished(ClusterState newClusterState) {
published.incrementAndGet();
semaphore.release();
}
}
ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
assert false;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
counters.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet();
updateLatch.countDown();
}
};
List<TaskExecutor> executors = new ArrayList<>();
for (int i = 0; i < numberOfExecutors; i++) {
executors.add(new TaskExecutor());
}
// randomly assign tasks to executors
List<TaskExecutor> assignments = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
for (int j = 0; j < tasksSubmittedPerThread; j++) {
assignments.add(randomFrom(executors));
}
}
Map<TaskExecutor, Integer> counts = new HashMap<>();
for (TaskExecutor executor : assignments) {
counts.merge(executor, 1, (previous, one) -> previous + one);
}
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
barrier.await();
for (int j = 0; j < tasksSubmittedPerThread; j++) {
ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j);
clusterService.submitStateUpdateTask(
Thread.currentThread().getName(),
new Task(),
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor,
listener);
}
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
}
});
thread.start();
}
// wait for all threads to be ready
barrier.await();
// wait for all threads to finish
barrier.await();
// wait until all the cluster state updates have been processed
updateLatch.await();
// and until all of the publication callbacks have completed
semaphore.acquire(numberOfExecutors);
// assert the number of executed tasks is correct
assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get());
// assert each executor executed the correct number of tasks
for (TaskExecutor executor : executors) {
if (counts.containsKey(executor)) {
assertEquals((int) counts.get(executor), executor.counter.get());
assertEquals(executor.batches.get(), executor.published.get());
}
}
// assert the correct number of clusterStateProcessed events were triggered
for (Map.Entry<String, AtomicInteger> entry : counters.entrySet()) {
assertEquals(entry.getValue().get(), tasksSubmittedPerThread);
}
}
@TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level
public void testClusterStateUpdateLogging() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
internalCluster().startNode(settings);
ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", "cluster.service", Level.DEBUG, "*processing [test1]: took * no change in cluster_state"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.TRACE, "*failed to execute cluster state update in *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.DEBUG, "*processing [test3]: took * done applying updated cluster_state (version: *, uuid: *)"));
Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(4);
clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}
@TestLogging("cluster:WARN") // To ensure that we log cluster state events on WARN level
public void testLongClusterStateUpdateLogging() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.put(InternalClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.getKey(), "10s")
.build();
internalCluster().startNode(settings);
ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class);
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test1 shouldn't see because setting is too low", "cluster.service", Level.WARN, "*cluster state update task [test1] took * above the warn threshold of *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.WARN, "*cluster state update task [test2] took * above the warn threshold of 10ms"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.WARN, "*cluster state update task [test3] took * above the warn threshold of 10ms"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", "cluster.service", Level.WARN, "*cluster state update task [test4] took * above the warn threshold of 10ms"));
Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
processedFirstTask.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
processedFirstTask.await(1, TimeUnit.SECONDS);
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put(InternalClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.getKey(), "10ms")));
clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return ClusterState.builder(currentState).incrementVersion().build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService1.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(100);
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService1.submitStateUpdateTask("test5", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}
private static class BlockingTask extends ClusterStateUpdateTask {
private final CountDownLatch latch = new CountDownLatch(1);
public BlockingTask(Priority priority) {
super(priority);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch.await();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
}
public void release() {
latch.countDown();
}
}
private static class PrioritizedTask extends ClusterStateUpdateTask {
private final CountDownLatch latch;
private final List<PrioritizedTask> tasks;
private PrioritizedTask(Priority priority, CountDownLatch latch, List<PrioritizedTask> tasks) {
super(priority);
this.latch = latch;
this.tasks = tasks;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
tasks.add(this);
latch.countDown();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
}
public static class TestPlugin extends Plugin {
@Override

View File

@ -0,0 +1,275 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
public class NodeConnectionsServiceTests extends ESTestCase {
private static ThreadPool THREAD_POOL;
private MockTransport transport;
private TransportService transportService;
private List<DiscoveryNode> generateNodes() {
List<DiscoveryNode> nodes = new ArrayList<>();
for (int i = randomIntBetween(20, 50); i > 0; i--) {
final HashMap<String, String> attributes = new HashMap<>();
if (rarely()) {
attributes.put("client", "true");
} else {
attributes.put("master", "" + randomBoolean());
attributes.put("data", "" + randomBoolean());
attributes.put("ingest", "" + randomBoolean());
}
nodes.add(new DiscoveryNode("node_" + i, "" + i, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT));
}
return nodes;
}
private ClusterState clusterStateFromNodes(List<DiscoveryNode> nodes) {
final DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : nodes) {
builder.put(node);
}
return ClusterState.builder(new ClusterName("test")).nodes(builder).build();
}
public void testConnectAndDisconnect() {
List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService);
ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
service.connectToAddedNodes(event);
assertConnected(event.nodesDelta().addedNodes());
service.disconnectFromRemovedNodes(event);
assertConnectedExactlyToNodes(event.state());
current = event.state();
event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
service.connectToAddedNodes(event);
assertConnected(event.nodesDelta().addedNodes());
service.disconnectFromRemovedNodes(event);
assertConnectedExactlyToNodes(event.state());
}
public void testReconnect() {
List<DiscoveryNode> nodes = generateNodes();
NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService);
ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
transport.randomConnectionExceptions = true;
service.connectToAddedNodes(event);
for (int i = 0; i < 3; i++) {
// simulate disconnects
for (DiscoveryNode node : randomSubsetOf(nodes)) {
transport.disconnectFromNode(node);
}
service.new ConnectionChecker().run();
}
// disable exceptions so things can be restored
transport.randomConnectionExceptions = false;
service.new ConnectionChecker().run();
assertConnectedExactlyToNodes(event.state());
}
private void assertConnectedExactlyToNodes(ClusterState state) {
assertConnected(state.nodes());
assertThat(transport.connectedNodes.size(), equalTo(state.nodes().size()));
}
private void assertConnected(Iterable<DiscoveryNode> nodes) {
for (DiscoveryNode node : nodes) {
assertTrue("not connected to " + node, transport.connectedNodes.contains(node));
}
}
private void assertNotConnected(Iterable<DiscoveryNode> nodes) {
for (DiscoveryNode node : nodes) {
assertFalse("still connected to " + node, transport.connectedNodes.contains(node));
}
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.transport = new MockTransport();
transportService = new TransportService(transport, THREAD_POOL);
transportService.start();
transportService.acceptIncomingRequests();
}
@Override
@After
public void tearDown() throws Exception {
transportService.stop();
super.tearDown();
}
@AfterClass
public static void stopThreadPool() {
ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS);
THREAD_POOL = null;
}
final class MockTransport implements Transport {
Set<DiscoveryNode> connectedNodes = ConcurrentCollections.newConcurrentSet();
volatile boolean randomConnectionExceptions = false;
@Override
public void transportServiceAdapter(TransportServiceAdapter service) {
}
@Override
public BoundTransportAddress boundAddress() {
return null;
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return null;
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
return new TransportAddress[0];
}
@Override
public boolean addressSupported(Class<? extends TransportAddress> address) {
return false;
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.contains(node);
}
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
if (connectedNodes.contains(node) == false && randomConnectionExceptions && randomBoolean()) {
throw new ConnectTransportException(node, "simulated");
}
connectedNodes.add(node);
}
@Override
public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
connectedNodes.remove(node);
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
}
@Override
public long serverOpen() {
return 0;
}
@Override
public List<String> getLocalAddresses() {
return null;
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
}
@Override
public Transport start() {
return null;
}
@Override
public Transport stop() {
return null;
}
@Override
public void close() {
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -40,7 +41,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESAllocationTestCase;
@ -305,7 +305,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
return randomSubsetOf(1, shards.toArray(new ShardRouting[0])).get(0);
} else {
return
TestShardRouting.newShardRouting(shardRouting.index(), shardRouting.id(), InternalClusterService.generateNodeId(Settings.EMPTY), randomBoolean(), randomFrom(ShardRoutingState.values()));
TestShardRouting.newShardRouting(shardRouting.index(), shardRouting.id(), DiscoveryNodeService.generateNodeId(Settings.EMPTY), randomBoolean(), randomFrom(ShardRoutingState.values()));
}
}

View File

@ -0,0 +1,824 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class ClusterServiceTests extends ESTestCase {
static ThreadPool threadPool;
TimedClusterService clusterService;
@BeforeClass
public static void createThreadPool() {
threadPool = new ThreadPool(ClusterServiceTests.class.getName());
}
@AfterClass
public static void stopThreadPool() {
if (threadPool != null) {
threadPool.shutdownNow();
threadPool = null;
}
}
@Before
public void setUp() throws Exception {
super.setUp();
clusterService = createClusterService(true);
}
@After
public void tearDown() throws Exception {
clusterService.close();
super.tearDown();
}
TimedClusterService createClusterService(boolean makeMaster) throws InterruptedException {
TimedClusterService test = new TimedClusterService(Settings.EMPTY, null,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool, new ClusterName("ClusterServiceTests"));
test.setLocalNode(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT));
test.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToAddedNodes(ClusterChangedEvent event) {
// skip
}
@Override
public void disconnectFromRemovedNodes(ClusterChangedEvent event) {
// skip
}
});
test.setClusterStatePublisher((event, ackListener) -> {
});
test.start();
CountDownLatch latch = new CountDownLatch(1);
test.submitStateUpdateTask("making a master", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final DiscoveryNodes nodes = currentState.nodes();
final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes)
.masterNodeId(makeMaster ? nodes.localNodeId() : null);
return ClusterState.builder(currentState).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build();
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("unexpected exception", t);
fail("unexpected exception" + t);
}
});
latch.await();
return test;
}
public void testTimeoutUpdateTask() throws Exception {
final CountDownLatch block = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
throw new RuntimeException(t);
}
});
final CountDownLatch timedOut = new CountDownLatch(1);
final AtomicBoolean executeCalled = new AtomicBoolean();
clusterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return TimeValue.timeValueMillis(2);
}
@Override
public void onFailure(String source, Throwable t) {
timedOut.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
executeCalled.set(true);
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
timedOut.await();
block.countDown();
final CountDownLatch allProcessed = new CountDownLatch(1);
clusterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public void onFailure(String source, Throwable t) {
throw new RuntimeException(t);
}
@Override
public ClusterState execute(ClusterState currentState) {
allProcessed.countDown();
return currentState;
}
});
allProcessed.await(); // executed another task to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));
}
public void testMasterAwareExecution() throws Exception {
ClusterService nonMaster = createClusterService(false);
final boolean[] taskFailed = {false};
final CountDownLatch latch1 = new CountDownLatch(1);
nonMaster.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch1.countDown();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
taskFailed[0] = true;
latch1.countDown();
}
});
latch1.await();
assertTrue("cluster state update task was executed on a non-master", taskFailed[0]);
taskFailed[0] = true;
final CountDownLatch latch2 = new CountDownLatch(1);
nonMaster.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
taskFailed[0] = false;
latch2.countDown();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
taskFailed[0] = true;
latch2.countDown();
}
});
latch2.await();
assertFalse("non-master cluster state update task was not executed", taskFailed[0]);
nonMaster.close();
}
/*
* test that a listener throwing an exception while handling a
* notification does not prevent publication notification to the
* executor
*/
public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean published = new AtomicBoolean();
clusterService.submitStateUpdateTask(
"testClusterStateTaskListenerThrowingExceptionIsOkay",
new Object(),
ClusterStateTaskConfig.build(Priority.NORMAL),
new ClusterStateTaskExecutor<Object>() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public BatchResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return BatchResult.builder().successes(tasks).build(newClusterState);
}
@Override
public void clusterStatePublished(ClusterState newClusterState) {
published.set(true);
latch.countDown();
}
},
new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
throw new IllegalStateException(source);
}
@Override
public void onFailure(String source, Throwable t) {
}
}
);
latch.await();
assertTrue(published.get());
}
// test that for a single thread, tasks are executed in the order
// that they are submitted
public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException {
class TaskExecutor implements ClusterStateTaskExecutor<Integer> {
List<Integer> tasks = new ArrayList<>();
@Override
public BatchResult<Integer> execute(ClusterState currentState, List<Integer> tasks) throws Exception {
this.tasks.addAll(tasks);
return BatchResult.<Integer>builder().successes(tasks).build(ClusterState.builder(currentState).build());
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
}
int numberOfThreads = randomIntBetween(2, 8);
TaskExecutor[] executors = new TaskExecutor[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
executors[i] = new TaskExecutor();
}
int tasksSubmittedPerThread = randomIntBetween(2, 1024);
CopyOnWriteArrayList<Tuple<String, Throwable>> failures = new CopyOnWriteArrayList<>();
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure: [{}]", t, source);
failures.add(new Tuple<>(source, t));
updateLatch.countDown();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
updateLatch.countDown();
}
};
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
barrier.await();
for (int j = 0; j < tasksSubmittedPerThread; j++) {
clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j,
ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener);
}
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new AssertionError(e);
}
});
thread.start();
}
// wait for all threads to be ready
barrier.await();
// wait for all threads to finish
barrier.await();
updateLatch.await();
assertThat(failures, empty());
for (int i = 0; i < numberOfThreads; i++) {
assertEquals(tasksSubmittedPerThread, executors[i].tasks.size());
for (int j = 0; j < tasksSubmittedPerThread; j++) {
assertNotNull(executors[i].tasks.get(j));
assertEquals("cluster state update task executed out of order", j, (int) executors[i].tasks.get(j));
}
}
}
public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {
AtomicInteger counter = new AtomicInteger();
class Task {
private AtomicBoolean state = new AtomicBoolean();
public void execute() {
if (!state.compareAndSet(false, true)) {
throw new IllegalStateException();
} else {
counter.incrementAndGet();
}
}
}
int numberOfThreads = randomIntBetween(2, 8);
int tasksSubmittedPerThread = randomIntBetween(1, 1024);
int numberOfExecutors = Math.max(1, numberOfThreads / 4);
final Semaphore semaphore = new Semaphore(numberOfExecutors);
class TaskExecutor implements ClusterStateTaskExecutor<Task> {
private AtomicInteger counter = new AtomicInteger();
private AtomicInteger batches = new AtomicInteger();
private AtomicInteger published = new AtomicInteger();
@Override
public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
tasks.forEach(task -> task.execute());
counter.addAndGet(tasks.size());
ClusterState maybeUpdatedClusterState = currentState;
if (randomBoolean()) {
maybeUpdatedClusterState = ClusterState.builder(currentState).build();
batches.incrementAndGet();
semaphore.acquire();
}
return BatchResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public void clusterStatePublished(ClusterState newClusterState) {
published.incrementAndGet();
semaphore.release();
}
}
ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
ClusterStateTaskListener listener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
assert false;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
counters.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet();
updateLatch.countDown();
}
};
List<TaskExecutor> executors = new ArrayList<>();
for (int i = 0; i < numberOfExecutors; i++) {
executors.add(new TaskExecutor());
}
// randomly assign tasks to executors
List<TaskExecutor> assignments = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
for (int j = 0; j < tasksSubmittedPerThread; j++) {
assignments.add(randomFrom(executors));
}
}
Map<TaskExecutor, Integer> counts = new HashMap<>();
for (TaskExecutor executor : assignments) {
counts.merge(executor, 1, (previous, one) -> previous + one);
}
CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
final int index = i;
Thread thread = new Thread(() -> {
try {
barrier.await();
for (int j = 0; j < tasksSubmittedPerThread; j++) {
ClusterStateTaskExecutor<Task> executor = assignments.get(index * tasksSubmittedPerThread + j);
clusterService.submitStateUpdateTask(
Thread.currentThread().getName(),
new Task(),
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor,
listener);
}
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
}
});
thread.start();
}
// wait for all threads to be ready
barrier.await();
// wait for all threads to finish
barrier.await();
// wait until all the cluster state updates have been processed
updateLatch.await();
// and until all of the publication callbacks have completed
semaphore.acquire(numberOfExecutors);
// assert the number of executed tasks is correct
assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get());
// assert each executor executed the correct number of tasks
for (TaskExecutor executor : executors) {
if (counts.containsKey(executor)) {
assertEquals((int) counts.get(executor), executor.counter.get());
assertEquals(executor.batches.get(), executor.published.get());
}
}
// assert the correct number of clusterStateProcessed events were triggered
for (Map.Entry<String, AtomicInteger> entry : counters.entrySet()) {
assertEquals(entry.getValue().get(), tasksSubmittedPerThread);
}
}
/**
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
*/
public void testPrioritizedTasks() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
BlockingTask block = new BlockingTask(Priority.IMMEDIATE);
clusterService.submitStateUpdateTask("test", block);
int taskCount = randomIntBetween(5, 20);
Priority[] priorities = Priority.values();
// will hold all the tasks in the order in which they were executed
List<PrioritizedTask> tasks = new ArrayList<>(taskCount);
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
Priority priority = priorities[randomIntBetween(0, priorities.length - 1)];
clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks));
}
block.release();
latch.await();
Priority prevPriority = null;
for (PrioritizedTask task : tasks) {
if (prevPriority == null) {
prevPriority = task.priority();
} else {
assertThat(task.priority().sameOrAfter(prevPriority), is(true));
}
}
}
@TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level
public void testClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", "cluster.service", Level.DEBUG,
"*processing [test1]: took [1s] no change in cluster_state"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.TRACE,
"*failed to execute cluster state update in [2s]*"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.DEBUG,
"*processing [test3]: took [3s] done applying updated cluster_state (version: *, uuid: *)"));
Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(4);
clusterService.currentTimeOverride = System.nanoTime();
clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
clusterService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos();
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
clusterService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
clusterService.currentTimeOverride += TimeValue.timeValueSeconds(3).nanos();
return ClusterState.builder(currentState).incrementVersion().build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
latch.await();
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}
@TestLogging("cluster:WARN") // To ensure that we log cluster state events on WARN level
public void testLongClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test1 shouldn't see because setting is too low",
"cluster.service", Level.WARN, "*cluster state update task [test1] took [*] above the warn threshold of *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.WARN,
"*cluster state update task [test2] took [32s] above the warn threshold of *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.WARN,
"*cluster state update task [test3] took [33s] above the warn threshold of *"));
mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", "cluster.service", Level.WARN,
"*cluster state update task [test4] took [34s] above the warn threshold of *"));
Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterService.currentTimeOverride = System.nanoTime();
clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
clusterService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos();
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
processedFirstTask.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
processedFirstTask.await();
clusterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
clusterService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
});
clusterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
clusterService.currentTimeOverride += TimeValue.timeValueSeconds(33).nanos();
return ClusterState.builder(currentState).incrementVersion().build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
clusterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
clusterService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos();
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
// Additional update task to make sure all previous logging made it to the logger
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Throwable t) {
fail();
}
});
latch.await();
} finally {
rootLogger.removeAppender(mockAppender);
}
mockAppender.assertAllExpectationsMatched();
}
private static class BlockingTask extends ClusterStateUpdateTask {
private final CountDownLatch latch = new CountDownLatch(1);
public BlockingTask(Priority priority) {
super(priority);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch.await();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
}
public void release() {
latch.countDown();
}
}
private static class PrioritizedTask extends ClusterStateUpdateTask {
private final CountDownLatch latch;
private final List<PrioritizedTask> tasks;
private PrioritizedTask(Priority priority, CountDownLatch latch, List<PrioritizedTask> tasks) {
super(priority);
this.latch = latch;
this.tasks = tasks;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
tasks.add(this);
latch.countDown();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
}
static class TimedClusterService extends InternalClusterService {
public volatile Long currentTimeOverride = null;
public TimedClusterService(Settings settings, OperationRouting operationRouting, ClusterSettings clusterSettings,
ThreadPool threadPool, ClusterName clusterName) {
super(settings, operationRouting, clusterSettings, threadPool, clusterName);
}
@Override
protected long currentTimeInNanos() {
if (currentTimeOverride != null) {
return currentTimeOverride;
}
return super.currentTimeInNanos();
}
}
}

View File

@ -80,7 +80,7 @@ public class MockLogAppender extends AppenderSkeleton {
protected final String logger;
protected final Level level;
protected final String message;
protected boolean saw;
volatile boolean saw;
public AbstractEventExpectation(String name, String logger, Level level, String message) {
this.name = name;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
@ -29,9 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
public class KeyedLockTests extends ESTestCase {
@ -68,28 +67,6 @@ public class KeyedLockTests extends ESTestCase {
}
}
public void testCannotAcquireTwoLocks() throws InterruptedException {
KeyedLock<String> connectionLock = new KeyedLock<String>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
try {
connectionLock.acquire(name);
fail("Expected IllegalStateException");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("Lock already acquired"));
}
}
public void testCannotReleaseUnacquiredLock() throws InterruptedException {
KeyedLock<String> connectionLock = new KeyedLock<String>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
try {
connectionLock.release(name);
fail("Expected IllegalStateException");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), is("Lock not acquired"));
}
}
public static class AcquireAndReleaseThread extends Thread {
private CountDownLatch startLatch;
@ -117,16 +94,16 @@ public class KeyedLockTests extends ESTestCase {
int numRuns = scaledRandomIntBetween(5000, 50000);
for (int i = 0; i < numRuns; i++) {
String curName = names[randomInt(names.length - 1)];
connectionLock.acquire(curName);
try {
assert connectionLock.isHeldByCurrentThread(curName) == false;
try (Releasable ignored = connectionLock.acquire(curName)) {
assert connectionLock.isHeldByCurrentThread(curName);
assert connectionLock.isHeldByCurrentThread(curName + "bla") == false;
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
} finally {
connectionLock.release(curName);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);

View File

@ -23,7 +23,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
@ -66,14 +66,14 @@ public class TribeUnitTests extends ESTestCase {
.put(baseSettings)
.put("cluster.name", "tribe1")
.put("node.name", "tribe1_node")
.put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
.put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
.build()).start();
tribe2 = new TribeClientNode(
Settings.builder()
.put(baseSettings)
.put("cluster.name", "tribe2")
.put("node.name", "tribe2_node")
.put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
.put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
.build()).start();
}

View File

@ -67,6 +67,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@ -285,7 +286,7 @@ public abstract class ESTestCase extends LuceneTestCase {
* @param start lower bound of interval to draw uniformly distributed random numbers from
* @param end upper bound
* @param lowerInclusive whether or not to include lower end of the interval
* */
*/
public static double randomDoubleBetween(double start, double end, boolean lowerInclusive) {
double result = 0.0;
@ -555,12 +556,27 @@ public abstract class ESTestCase extends LuceneTestCase {
* Returns size random values
*/
public static <T> List<T> randomSubsetOf(int size, T... values) {
if (size > values.length) {
throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a list of " + values.length + " objects");
}
List<T> list = arrayAsArrayList(values);
Collections.shuffle(list, random());
return list.subList(0, size);
return randomSubsetOf(size, list);
}
/**
* Returns a random subset of values (including a potential empty list)
*/
public static <T> List<T> randomSubsetOf(Collection<T> collection) {
return randomSubsetOf(randomInt(collection.size() - 1), collection);
}
/**
* Returns size random values
*/
public static <T> List<T> randomSubsetOf(int size, Collection<T> collection) {
if (size > collection.size()) {
throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a collection of " + collection.size() + " objects");
}
List<T> tempList = new ArrayList<>(collection);
Collections.shuffle(tempList, random());
return tempList.subList(0, size);
}
/**

View File

@ -39,13 +39,13 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
@ -591,7 +591,7 @@ public final class InternalTestCluster extends TestCluster {
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home
.put(settings)
.put("node.name", name)
.put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), seed)
.put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), seed)
.build();
MockNode node = new MockNode(finalSettings, version, plugins);
return new NodeAndClient(name, node);
@ -838,8 +838,8 @@ public final class InternalTestCluster extends TestCluster {
IOUtils.rm(nodeEnv.nodeDataPaths());
}
}
final long newIdSeed = InternalClusterService.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id
Settings finalSettings = Settings.builder().put(node.settings()).put(newSettings).put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build();
final long newIdSeed = DiscoveryNodeService.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id
Settings finalSettings = Settings.builder().put(node.settings()).put(newSettings).put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build();
Collection<Class<? extends Plugin>> plugins = node.getPlugins();
Version version = node.getVersion();
node = new MockNode(finalSettings, version, plugins);

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskManager;
import java.util.List;
@ -153,11 +152,6 @@ public class NoopClusterService implements ClusterService {
return TimeValue.timeValueMillis(0);
}
@Override
public TaskManager getTaskManager() {
return null;
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -47,9 +47,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Arrays;
import java.util.Iterator;
@ -62,7 +60,6 @@ import java.util.concurrent.ScheduledFuture;
public class TestClusterService implements ClusterService {
volatile ClusterState state;
private volatile TaskManager taskManager;
private final List<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
private final ThreadPool threadPool;
@ -75,12 +72,6 @@ public class TestClusterService implements ClusterService {
public TestClusterService(ThreadPool threadPool) {
this(ClusterState.builder(new ClusterName("test")).build(), threadPool);
taskManager = new TaskManager(Settings.EMPTY);
}
public TestClusterService(ThreadPool threadPool, TransportService transportService) {
this(ClusterState.builder(new ClusterName("test")).build(), threadPool);
taskManager = transportService.getTaskManager();
}
public TestClusterService(ClusterState state) {
@ -243,11 +234,6 @@ public class TestClusterService implements ClusterService {
throw new UnsupportedOperationException();
}
@Override
public TaskManager getTaskManager() {
return taskManager;
}
@Override
public List<PendingClusterTask> pendingTasks() {
throw new UnsupportedOperationException();