From cd12241e9f76c01c98961afb4f5b874926cc4269 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 27 Feb 2016 18:48:42 +0100 Subject: [PATCH] Decouple the TransportService and ClusterService #16872 Currently, the cluster service is tightly coupled to the transport service by both managing node connections and requiring the bound address in order to create the local disco node. This commit introduces a new NodeConnectionsService which is in charge of node connection management and makes it possible to remove all network related calls from the cluster service. The local DiscoNode is now created by DiscoveryNodeService and is set both the cluster service and the transport service during node start up. Closes #16788 Closes #16872 --- .../elasticsearch/cluster/ClusterModule.java | 1 + .../elasticsearch/cluster/ClusterService.java | 6 - .../cluster/NodeConnectionsService.java | 156 ++++ .../cluster/node/DiscoveryNodeService.java | 25 +- .../service/InternalClusterService.java | 175 +--- .../common/settings/ClusterSettings.java | 6 +- .../common/util/concurrent/KeyedLock.java | 52 +- .../java/org/elasticsearch/node/Node.java | 18 + .../transport/netty/NettyTransport.java | 18 +- .../node/tasks/TaskManagerTestCase.java | 4 +- .../admin/cluster/node/tasks/TasksIT.java | 13 +- .../cluster/ClusterServiceIT.java | 708 --------------- .../cluster/NodeConnectionsServiceTests.java | 275 ++++++ ...rdFailedClusterStateTaskExecutorTests.java | 4 +- .../cluster/service/ClusterServiceTests.java | 824 ++++++++++++++++++ .../elasticsearch/test/MockLogAppender.java | 2 +- .../transport/netty/KeyedLockTests.java | 33 +- .../elasticsearch/tribe/TribeUnitTests.java | 6 +- .../org/elasticsearch/test/ESTestCase.java | 32 +- .../test/InternalTestCluster.java | 8 +- .../test/cluster/NoopClusterService.java | 6 - .../test/cluster/TestClusterService.java | 14 - 22 files changed, 1423 insertions(+), 963 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 3e668191ff3..6d9273b2661 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -136,6 +136,7 @@ public class ClusterModule extends AbstractModule { bind(AllocationService.class).asEagerSingleton(); bind(DiscoveryNodeService.class).asEagerSingleton(); bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton(); + bind(NodeConnectionsService.class).asEagerSingleton(); bind(OperationRouting.class).asEagerSingleton(); bind(MetaDataCreateIndexService.class).asEagerSingleton(); bind(MetaDataDeleteIndexService.class).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java index 27df4b9e96f..10d547afc5c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.tasks.TaskManager; import java.util.List; @@ -154,9 +153,4 @@ public interface ClusterService extends LifecycleComponent { * @return A zero time value if the queue is empty, otherwise the time value oldest task waiting in the queue */ TimeValue getMaxTaskWaitTime(); - - /** - * Returns task manager created in the cluster service - */ - TaskManager getTaskManager(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java new file mode 100644 index 00000000000..cce25652ed7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.KeyedLock; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; + +/** + * This component is responsible for connecting to nodes once they are added to the cluster state, and disconnect when they are + * removed. Also, it periodically checks that all connections are still open and if needed restores them. + * Note that this component is *not* responsible for removing nodes from the cluster if they disconnect / do not respond + * to pings. This is done by {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection}. Master fault detection + * is done by {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection}. + */ +public class NodeConnectionsService extends AbstractLifecycleComponent { + + public static final Setting CLUSTER_NODE_RECONNECT_INTERVAL_SETTING = + Setting.positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER); + private final ThreadPool threadPool; + private final TransportService transportService; + + // map between current node and the number of failed connection attempts. 0 means successfully connected. + // if a node doesn't appear in this list it shouldn't be monitored + private ConcurrentMap nodes = ConcurrentCollections.newConcurrentMap(); + + final private KeyedLock nodeLocks = new KeyedLock<>(); + + private final TimeValue reconnectInterval; + + private volatile ScheduledFuture backgroundFuture = null; + + @Inject + public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings); + } + + public void connectToAddedNodes(ClusterChangedEvent event) { + + // TODO: do this in parallel (and wait) + for (final DiscoveryNode node : event.nodesDelta().addedNodes()) { + try (Releasable ignored = nodeLocks.acquire(node)) { + Integer current = nodes.put(node, 0); + assert current == null : "node " + node + " was added in event but already in internal nodes"; + validateNodeConnected(node); + } + } + } + + public void disconnectFromRemovedNodes(ClusterChangedEvent event) { + for (final DiscoveryNode node : event.nodesDelta().removedNodes()) { + try (Releasable ignored = nodeLocks.acquire(node)) { + Integer current = nodes.remove(node); + assert current != null : "node " + node + " was removed in event but not in internal nodes"; + try { + transportService.disconnectFromNode(node); + } catch (Throwable e) { + logger.warn("failed to disconnect to node [" + node + "]", e); + } + } + } + } + + void validateNodeConnected(DiscoveryNode node) { + assert nodeLocks.isHeldByCurrentThread(node) : "validateNodeConnected must be called under lock"; + if (lifecycle.stoppedOrClosed() || + nodes.containsKey(node) == false) { // we double check existence of node since connectToNode might take time... + // nothing to do + } else { + try { + // connecting to an already connected node is a noop + transportService.connectToNode(node); + nodes.put(node, 0); + } catch (Exception e) { + Integer nodeFailureCount = nodes.get(node); + assert nodeFailureCount != null : node + " didn't have a counter in nodes map"; + nodeFailureCount = nodeFailureCount + 1; + // log every 6th failure + if ((nodeFailureCount % 6) == 1) { + logger.warn("failed to connect to node {} (tried [{}] times)", e, node, nodeFailureCount); + } + nodes.put(node, nodeFailureCount); + } + } + } + + class ConnectionChecker extends AbstractRunnable { + + @Override + public void onFailure(Throwable t) { + logger.warn("unexpected error while checking for node reconnects", t); + } + + protected void doRun() { + for (DiscoveryNode node : nodes.keySet()) { + try (Releasable ignored = nodeLocks.acquire(node)) { + validateNodeConnected(node); + } + } + } + + @Override + public void onAfter() { + if (lifecycle.started()) { + backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this); + } + } + } + + @Override + protected void doStart() { + backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker()); + } + + @Override + protected void doStop() { + FutureUtils.cancel(backgroundFuture); + } + + @Override + protected void doClose() { + + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java index 83f603d2890..47c0e0052d3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeService.java @@ -19,24 +19,40 @@ package org.elasticsearch.cluster.node; +import org.elasticsearch.Version; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.CopyOnWriteArrayList; /** */ public class DiscoveryNodeService extends AbstractComponent { + public static final Setting NODE_ID_SEED_SETTING = + // don't use node.id.seed so it won't be seen as an attribute + Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, false, Setting.Scope.CLUSTER); private final List customAttributesProviders = new CopyOnWriteArrayList<>(); + private final Version version; @Inject - public DiscoveryNodeService(Settings settings) { + public DiscoveryNodeService(Settings settings, Version version) { super(settings); + this.version = version; + } + + public static String generateNodeId(Settings settings) { + Random random = Randomness.get(settings, NODE_ID_SEED_SETTING); + return Strings.randomBase64UUID(random); } public DiscoveryNodeService addCustomAttributeProvider(CustomAttributesProvider customAttributesProvider) { @@ -44,7 +60,7 @@ public class DiscoveryNodeService extends AbstractComponent { return this; } - public Map buildAttributes() { + public DiscoveryNode buildLocalNode(TransportAddress publishAddress) { Map attributes = new HashMap<>(settings.getByPrefix("node.").getAsMap()); attributes.remove("name"); // name is extracted in other places if (attributes.containsKey("client")) { @@ -76,10 +92,11 @@ public class DiscoveryNodeService extends AbstractComponent { } } - return attributes; + final String nodeId = generateNodeId(settings); + return new DiscoveryNode(settings.get("node.name"), nodeId, publishAddress, attributes, version); } - public static interface CustomAttributesProvider { + public interface CustomAttributesProvider { Map buildAttributes(); } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 3d70ac84e33..7cd3d840fbc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.service; -import org.elasticsearch.Version; import org.elasticsearch.cluster.AckedClusterStateTaskListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -32,19 +31,18 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.TimeoutClusterStateListener; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -54,7 +52,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; @@ -65,9 +62,7 @@ import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.Collection; @@ -78,8 +73,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Queue; -import java.util.Random; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -97,25 +90,15 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF public class InternalClusterService extends AbstractLifecycleComponent implements ClusterService { public static final Setting CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting("cluster.service.slow_task_logging_threshold", TimeValue.timeValueSeconds(30), true, Setting.Scope.CLUSTER); - public static final Setting CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.service.reconnect_interval", TimeValue.timeValueSeconds(10), false, Setting.Scope.CLUSTER); public static final String UPDATE_THREAD_NAME = "clusterService#updateTask"; - public static final Setting NODE_ID_SEED_SETTING = - // don't use node.id.seed so it won't be seen as an attribute - Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, false, Setting.Scope.CLUSTER); private final ThreadPool threadPool; private BiConsumer clusterStatePublisher; private final OperationRouting operationRouting; - private final TransportService transportService; - private final ClusterSettings clusterSettings; - private final DiscoveryNodeService discoveryNodeService; - private final Version version; - - private final TimeValue reconnectInterval; private TimeValue slowTaskLoggingThreshold; @@ -140,47 +123,49 @@ public class InternalClusterService extends AbstractLifecycleComponent publisher) { + synchronized public void setClusterStatePublisher(BiConsumer publisher) { clusterStatePublisher = publisher; } + synchronized public void setLocalNode(DiscoveryNode localNode) { + assert clusterState.nodes().localNodeId() == null : "local node is already set"; + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).put(localNode).localNodeId(localNode.id()); + this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + } + + synchronized public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { + assert this.nodeConnectionsService == null : "nodeConnectionsService is already set"; + this.nodeConnectionsService = nodeConnectionsService; + } + @Override - public void addInitialStateBlock(ClusterBlock block) throws IllegalStateException { + synchronized public void addInitialStateBlock(ClusterBlock block) throws IllegalStateException { if (lifecycle.started()) { throw new IllegalStateException("can't set initial block when started"); } @@ -188,12 +173,12 @@ public class InternalClusterService extends AbstractLifecycleComponent nodeAttributes = discoveryNodeService.buildAttributes(); - // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling - final String nodeId = generateNodeId(settings); - final TransportAddress publishAddress = transportService.boundAddress().publishAddress(); - DiscoveryNode localNode = new DiscoveryNode(settings.get("node.name"), nodeId, publishAddress, nodeAttributes, version); - DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder().put(localNode).localNodeId(localNode.id()); - this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).blocks(initialBlocks).build(); - this.transportService.setLocalNode(localNode); + this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build(); } @Override - protected void doStop() { - FutureUtils.cancel(this.reconnectToNodes); + synchronized protected void doStop() { for (NotifyTimeout onGoingTimeout : onGoingTimeouts) { onGoingTimeout.cancel(); onGoingTimeout.listener.onClose(); @@ -230,7 +207,7 @@ public class InternalClusterService extends AbstractLifecycleComponent batchResult; - long startTimeNS = System.nanoTime(); + long startTimeNS = currentTimeInNanos(); try { List inputs = toExecute.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList()); batchResult = executor.execute(previousClusterState, inputs); } catch (Throwable e) { - TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS))); + TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n"); + StringBuilder sb = new StringBuilder("failed to execute cluster state update in [").append(executionTime).append("], state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n"); sb.append(previousClusterState.nodes().prettyPrint()); sb.append(previousClusterState.routingTable().prettyPrint()); sb.append(previousClusterState.getRoutingNodes().prettyPrint()); @@ -509,8 +481,8 @@ public class InternalClusterService extends AbstractLifecycleComponent slowTaskLoggingThreshold.getMillis()) { - logger.warn("cluster state update task [{}] took {} above the warn threshold of {}", source, executionTime, slowTaskLoggingThreshold); + logger.warn("cluster state update task [{}] took [{}] above the warn threshold of {}", source, executionTime, slowTaskLoggingThreshold); } } @@ -809,64 +770,6 @@ public class InternalClusterService extends AbstractLifecycleComponent failureCount = ConcurrentCollections.newConcurrentMap(); - - @Override - public void run() { - // master node will check against all nodes if its alive with certain discoveries implementations, - // but we can't rely on that, so we check on it as well - for (DiscoveryNode node : clusterState.nodes()) { - if (lifecycle.stoppedOrClosed()) { - return; - } - if (clusterState.nodes().nodeExists(node.id())) { // we double check existence of node since connectToNode might take time... - if (!transportService.nodeConnected(node)) { - try { - transportService.connectToNode(node); - } catch (Exception e) { - if (lifecycle.stoppedOrClosed()) { - return; - } - if (clusterState.nodes().nodeExists(node.id())) { // double check here as well, maybe its gone? - Integer nodeFailureCount = failureCount.get(node); - if (nodeFailureCount == null) { - nodeFailureCount = 1; - } else { - nodeFailureCount = nodeFailureCount + 1; - } - // log every 6th failure - if ((nodeFailureCount % 6) == 0) { - // reset the failure count... - nodeFailureCount = 0; - logger.warn("failed to reconnect to node {}", e, node); - } - failureCount.put(node, nodeFailureCount); - } - } - } - } - } - // go over and remove failed nodes that have been removed - DiscoveryNodes nodes = clusterState.nodes(); - for (Iterator failedNodesIt = failureCount.keySet().iterator(); failedNodesIt.hasNext(); ) { - DiscoveryNode failedNode = failedNodesIt.next(); - if (!nodes.nodeExists(failedNode.id())) { - failedNodesIt.remove(); - } - } - if (lifecycle.started()) { - reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this); - } - } - } - - public static String generateNodeId(Settings settings) { - Random random = Randomness.get(settings, NODE_ID_SEED_SETTING); - return Strings.randomBase64UUID(random); - } - private static class LocalNodeMasterListeners implements ClusterStateListener { private final List listeners = new CopyOnWriteArrayList<>(); diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index fa8b8c4ac41..3215f3db05a 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -29,8 +29,10 @@ import org.elasticsearch.client.transport.TransportClientNodesService; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; @@ -259,7 +261,7 @@ public final class ClusterSettings extends AbstractScopedSettings { TransportService.TRACE_LOG_INCLUDE_SETTING, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, - InternalClusterService.CLUSTER_SERVICE_RECONNECT_INTERVAL_SETTING, + NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, Transport.TRANSPORT_TCP_COMPRESS, @@ -326,7 +328,7 @@ public final class ClusterSettings extends AbstractScopedSettings { Environment.PATH_SCRIPTS_SETTING, Environment.PATH_SHARED_DATA_SETTING, Environment.PIDFILE_SETTING, - InternalClusterService.NODE_ID_SEED_SETTING, + DiscoveryNodeService.NODE_ID_SEED_SETTING, DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING, DiscoveryModule.DISCOVERY_TYPE_SETTING, DiscoveryModule.ZEN_MASTER_SERVICE_TYPE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java index 83bb9fd690d..5c30330c156 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java @@ -20,7 +20,10 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.common.lease.Releasable; + import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -29,9 +32,8 @@ import java.util.concurrent.locks.ReentrantLock; * created the first time they are acquired and removed if no thread hold the * lock. The latter is important to assure that the list of locks does not grow * infinitely. - * - * A Thread can acquire a lock only once. - * + * + * * */ public class KeyedLock { @@ -50,48 +52,38 @@ public class KeyedLock { private final ConcurrentMap map = ConcurrentCollections.newConcurrentMap(); - protected final ThreadLocal threadLocal = new ThreadLocal<>(); - - public void acquire(T key) { + public Releasable acquire(T key) { + assert isHeldByCurrentThread(key) == false : "lock for " + key + " is already heald by this thread"; while (true) { - if (threadLocal.get() != null) { - // if we are here, the thread already has the lock - throw new IllegalStateException("Lock already acquired in Thread" + Thread.currentThread().getId() - + " for key " + key); - } KeyLock perNodeLock = map.get(key); if (perNodeLock == null) { KeyLock newLock = new KeyLock(fair); perNodeLock = map.putIfAbsent(key, newLock); if (perNodeLock == null) { newLock.lock(); - threadLocal.set(newLock); - return; + return new ReleasableLock(key, newLock); } } assert perNodeLock != null; int i = perNodeLock.count.get(); if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) { perNodeLock.lock(); - threadLocal.set(perNodeLock); - return; + return new ReleasableLock(key, perNodeLock); } } } - public void release(T key) { - KeyLock lock = threadLocal.get(); + public boolean isHeldByCurrentThread(T key) { + KeyLock lock = map.get(key); if (lock == null) { - throw new IllegalStateException("Lock not acquired"); + return false; } - release(key, lock); + return lock.isHeldByCurrentThread(); } void release(T key, KeyLock lock) { - assert lock.isHeldByCurrentThread(); assert lock == map.get(key); lock.unlock(); - threadLocal.set(null); int decrementAndGet = lock.count.decrementAndGet(); if (decrementAndGet == 0) { map.remove(key, lock); @@ -99,6 +91,24 @@ public class KeyedLock { } + private final class ReleasableLock implements Releasable { + final T key; + final KeyLock lock; + final AtomicBoolean closed = new AtomicBoolean(); + + private ReleasableLock(T key, KeyLock lock) { + this.key = key; + this.lock = lock; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + release(key, lock); + } + } + } + @SuppressWarnings("serial") private final static class KeyLock extends ReentrantLock { KeyLock(boolean fair) { diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index e279d3e819f..b995723127a 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -34,8 +34,10 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.MasterNodeChangePredicate; +import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.StopWatch; @@ -294,6 +296,10 @@ public class Node implements Closeable { "node cluster service implementation must inherit from InternalClusterService"; final InternalClusterService clusterService = (InternalClusterService) injector.getInstance(ClusterService.class); + final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); + nodeConnectionsService.start(); + clusterService.setNodeConnectionsService(nodeConnectionsService); + // TODO hack around circular dependencies problems injector.getInstance(GatewayAllocator.class).setReallocation(clusterService, injector.getInstance(RoutingService.class)); @@ -311,6 +317,15 @@ public class Node implements Closeable { // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); transportService.start(); + DiscoveryNode localNode = injector.getInstance(DiscoveryNodeService.class) + .buildLocalNode(transportService.boundAddress().publishAddress()); + + // TODO: need to find a cleaner way to start/construct a service with some initial parameters, + // playing nice with the life cycle interfaces + clusterService.setLocalNode(localNode); + transportService.setLocalNode(localNode); + clusterService.add(transportService.getTaskManager()); + clusterService.start(); // start after cluster service so the local disco is known @@ -392,6 +407,7 @@ public class Node implements Closeable { injector.getInstance(RoutingService.class).stop(); injector.getInstance(ClusterService.class).stop(); injector.getInstance(Discovery.class).stop(); + injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(MonitorService.class).stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); @@ -449,6 +465,8 @@ public class Node implements Closeable { toClose.add(injector.getInstance(RoutingService.class)); toClose.add(() -> stopWatch.stop().start("cluster")); toClose.add(injector.getInstance(ClusterService.class)); + toClose.add(() -> stopWatch.stop().start("node_connections_service")); + toClose.add(injector.getInstance(NodeConnectionsService.class)); toClose.add(() -> stopWatch.stop().start("discovery")); toClose.add(injector.getInstance(Discovery.class)); toClose.add(() -> stopWatch.stop().start("monitor")); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index dc9dd70ab8d..27ba643ef71 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.metrics.CounterMetric; @@ -943,8 +944,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem } globalLock.readLock().lock(); try { - connectionLock.acquire(node.id()); - try { + + try (Releasable ignored = connectionLock.acquire(node.id())) { if (!lifecycle.started()) { throw new IllegalStateException("can't add nodes to a stopped transport"); } @@ -979,8 +980,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem } catch (Exception e) { throw new ConnectTransportException(node, "general node connection failure", e); } - } finally { - connectionLock.release(node.id()); } } finally { globalLock.readLock().unlock(); @@ -1103,8 +1102,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem @Override public void disconnectFromNode(DiscoveryNode node) { - connectionLock.acquire(node.id()); - try { + + try (Releasable ignored = connectionLock.acquire(node.id())) { NodeChannels nodeChannels = connectedNodes.remove(node); if (nodeChannels != null) { try { @@ -1115,8 +1114,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem transportServiceAdapter.raiseNodeDisconnected(node); } } - } finally { - connectionLock.release(node.id()); } } @@ -1128,8 +1125,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem // check outside of the lock NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels != null && nodeChannels.hasChannel(channel)) { - connectionLock.acquire(node.id()); - try { + try (Releasable ignored = connectionLock.acquire(node.id())) { nodeChannels = connectedNodes.get(node); // check again within the connection lock, if its still applicable to remove it if (nodeChannels != null && nodeChannels.hasChannel(channel)) { @@ -1143,8 +1139,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem } return true; } - } finally { - connectionLock.release(node.id()); } } return false; diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 4dcf54b5d0b..f5d8637571a 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -194,7 +194,7 @@ public abstract class TaskManagerTestCase extends ESTestCase { } }; transportService.start(); - clusterService = new TestClusterService(threadPool, transportService); + clusterService = new TestClusterService(threadPool); clusterService.add(transportService.getTaskManager()); discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), Version.CURRENT); IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings); @@ -238,7 +238,7 @@ public abstract class TaskManagerTestCase extends ESTestCase { RecordingTaskManagerListener[] listeners = new RecordingTaskManagerListener[nodes.length]; for (int i = 0; i < nodes.length; i++) { listeners[i] = new RecordingTaskManagerListener(nodes[i].discoveryNode, actionMasks); - ((MockTaskManager) (nodes[i].clusterService.getTaskManager())).addListener(listeners[i]); + ((MockTaskManager) (nodes[i].transportService.getTaskManager())).addListener(listeners[i]); } return listeners; } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 8c791a99018..7c2747a1a28 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -43,6 +43,7 @@ import org.elasticsearch.test.tasks.MockTaskManager; import org.elasticsearch.test.tasks.MockTaskManagerListener; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ReceiveTimeoutTransportException; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; @@ -263,8 +264,8 @@ public class TasksIT extends ESIntegTestCase { ReentrantLock taskFinishLock = new ReentrantLock(); taskFinishLock.lock(); CountDownLatch taskRegistered = new CountDownLatch(1); - for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) { - ((MockTaskManager)clusterService.getTaskManager()).addListener(new MockTaskManagerListener() { + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + ((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() { @Override public void onTaskRegistered(Task task) { if (task.getAction().startsWith(IndexAction.NAME)) { @@ -408,7 +409,7 @@ public class TasksIT extends ESIntegTestCase { @Override public void tearDown() throws Exception { for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { - ((MockTaskManager)internalCluster().getInstance(ClusterService.class, entry.getKey().v1()).getTaskManager()).removeListener(entry.getValue()); + ((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener(entry.getValue()); } listeners.clear(); super.tearDown(); @@ -418,10 +419,10 @@ public class TasksIT extends ESIntegTestCase { * Registers recording task event listeners with the given action mask on all nodes */ private void registerTaskManageListeners(String actionMasks) { - for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) { - DiscoveryNode node = clusterService.localNode(); + for (String nodeName : internalCluster().getNodeNames()) { + DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node, Strings.splitStringToArray(actionMasks, ',')); - ((MockTaskManager)clusterService.getTaskManager()).addListener(listener); + ((MockTaskManager) internalCluster().getInstance(TransportService.class, nodeName).getTaskManager()).addListener(listener); RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.name(), actionMasks), listener); assertNull(oldListener); } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index f5c99fd5f7e..813557e314b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -18,16 +18,12 @@ */ package org.elasticsearch.cluster; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -35,38 +31,24 @@ import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -85,74 +67,6 @@ public class ClusterServiceIT extends ESIntegTestCase { return pluginList(TestPlugin.class); } - public void testTimeoutUpdateTask() throws Exception { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); - ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class); - final CountDownLatch block = new CountDownLatch(1); - clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - try { - block.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return currentState; - } - - @Override - public void onFailure(String source, Throwable t) { - throw new RuntimeException(t); - } - }); - - final CountDownLatch timedOut = new CountDownLatch(1); - final AtomicBoolean executeCalled = new AtomicBoolean(); - clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { - @Override - public TimeValue timeout() { - return TimeValue.timeValueMillis(2); - } - - @Override - public void onFailure(String source, Throwable t) { - timedOut.countDown(); - } - - @Override - public ClusterState execute(ClusterState currentState) { - executeCalled.set(true); - return currentState; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - } - }); - - timedOut.await(); - block.countDown(); - final CountDownLatch allProcessed = new CountDownLatch(1); - clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { - @Override - public void onFailure(String source, Throwable t) { - throw new RuntimeException(t); - } - - @Override - public ClusterState execute(ClusterState currentState) { - allProcessed.countDown(); - return currentState; - } - - }); - allProcessed.await(); // executed another task to double check that execute on the timed out update task is not called... - assertThat(executeCalled.get(), equalTo(false)); - } - public void testAckedUpdateTask() throws Exception { Settings settings = settingsBuilder() .put("discovery.type", "local") @@ -299,63 +213,6 @@ public class ClusterServiceIT extends ESIntegTestCase { assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true)); } - - public void testMasterAwareExecution() throws Exception { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .build(); - - InternalTestCluster.Async master = internalCluster().startNodeAsync(settings); - InternalTestCluster.Async nonMaster = internalCluster().startNodeAsync(settingsBuilder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).build()); - master.get(); - ensureGreen(); // make sure we have a cluster - - ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nonMaster.get()); - - final boolean[] taskFailed = {false}; - final CountDownLatch latch1 = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - latch1.countDown(); - return currentState; - } - - @Override - public void onFailure(String source, Throwable t) { - taskFailed[0] = true; - latch1.countDown(); - } - }); - - latch1.await(); - assertTrue("cluster state update task was executed on a non-master", taskFailed[0]); - - taskFailed[0] = true; - final CountDownLatch latch2 = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - taskFailed[0] = false; - latch2.countDown(); - return currentState; - } - - @Override - public void onFailure(String source, Throwable t) { - taskFailed[0] = true; - latch2.countDown(); - } - }); - latch2.await(); - assertFalse("non-master cluster state update task was not executed", taskFailed[0]); - } - public void testAckedUpdateTaskNoAckExpected() throws Exception { Settings settings = settingsBuilder() .put("discovery.type", "local") @@ -715,571 +572,6 @@ public class ClusterServiceIT extends ESIntegTestCase { } } - /** - * Note, this test can only work as long as we have a single thread executor executing the state update tasks! - */ - public void testPrioritizedTasks() throws Exception { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - BlockingTask block = new BlockingTask(Priority.IMMEDIATE); - clusterService.submitStateUpdateTask("test", block); - int taskCount = randomIntBetween(5, 20); - Priority[] priorities = Priority.values(); - - // will hold all the tasks in the order in which they were executed - List tasks = new ArrayList<>(taskCount); - CountDownLatch latch = new CountDownLatch(taskCount); - for (int i = 0; i < taskCount; i++) { - Priority priority = priorities[randomIntBetween(0, priorities.length - 1)]; - clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks)); - } - - block.release(); - latch.await(); - - Priority prevPriority = null; - for (PrioritizedTask task : tasks) { - if (prevPriority == null) { - prevPriority = task.priority(); - } else { - assertThat(task.priority().sameOrAfter(prevPriority), is(true)); - } - } - } - - /* - * test that a listener throwing an exception while handling a - * notification does not prevent publication notification to the - * executor - */ - public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws InterruptedException { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - - final CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean published = new AtomicBoolean(); - - clusterService.submitStateUpdateTask( - "testClusterStateTaskListenerThrowingExceptionIsOkay", - new Object(), - ClusterStateTaskConfig.build(Priority.NORMAL), - new ClusterStateTaskExecutor() { - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterState newClusterState = ClusterState.builder(currentState).build(); - return BatchResult.builder().successes(tasks).build(newClusterState); - } - - @Override - public void clusterStatePublished(ClusterState newClusterState) { - published.set(true); - latch.countDown(); - } - }, - new ClusterStateTaskListener() { - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - throw new IllegalStateException(source); - } - - @Override - public void onFailure(String source, Throwable t) { - } - } - ); - - latch.await(); - assertTrue(published.get()); - } - - // test that for a single thread, tasks are executed in the order - // that they are submitted - public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - - class TaskExecutor implements ClusterStateTaskExecutor { - List tasks = new ArrayList<>(); - - @Override - public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - this.tasks.addAll(tasks); - return BatchResult.builder().successes(tasks).build(ClusterState.builder(currentState).build()); - } - - @Override - public boolean runOnlyOnMaster() { - return false; - } - } - - int numberOfThreads = randomIntBetween(2, 8); - TaskExecutor[] executors = new TaskExecutor[numberOfThreads]; - for (int i = 0; i < numberOfThreads; i++) { - executors[i] = new TaskExecutor(); - } - - int tasksSubmittedPerThread = randomIntBetween(2, 1024); - - CopyOnWriteArrayList> failures = new CopyOnWriteArrayList<>(); - CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); - - ClusterStateTaskListener listener = new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure: [{}]", t, source); - failures.add(new Tuple<>(source, t)); - updateLatch.countDown(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - updateLatch.countDown(); - } - }; - - CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - - for (int i = 0; i < numberOfThreads; i++) { - final int index = i; - Thread thread = new Thread(() -> { - try { - barrier.await(); - for (int j = 0; j < tasksSubmittedPerThread; j++) { - clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener); - } - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new AssertionError(e); - } - }); - thread.start(); - } - - // wait for all threads to be ready - barrier.await(); - // wait for all threads to finish - barrier.await(); - - updateLatch.await(); - - assertThat(failures, empty()); - - for (int i = 0; i < numberOfThreads; i++) { - assertEquals(tasksSubmittedPerThread, executors[i].tasks.size()); - for (int j = 0; j < tasksSubmittedPerThread; j++) { - assertNotNull(executors[i].tasks.get(j)); - assertEquals("cluster state update task executed out of order", j, (int)executors[i].tasks.get(j)); - } - } - } - - public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); - ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - - AtomicInteger counter = new AtomicInteger(); - class Task { - private AtomicBoolean state = new AtomicBoolean(); - - public void execute() { - if (!state.compareAndSet(false, true)) { - throw new IllegalStateException(); - } else { - counter.incrementAndGet(); - } - } - } - - int numberOfThreads = randomIntBetween(2, 8); - int tasksSubmittedPerThread = randomIntBetween(1, 1024); - int numberOfExecutors = Math.max(1, numberOfThreads / 4); - final Semaphore semaphore = new Semaphore(numberOfExecutors); - - class TaskExecutor implements ClusterStateTaskExecutor { - private AtomicInteger counter = new AtomicInteger(); - private AtomicInteger batches = new AtomicInteger(); - private AtomicInteger published = new AtomicInteger(); - - @Override - public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - tasks.forEach(task -> task.execute()); - counter.addAndGet(tasks.size()); - ClusterState maybeUpdatedClusterState = currentState; - if (randomBoolean()) { - maybeUpdatedClusterState = ClusterState.builder(currentState).build(); - batches.incrementAndGet(); - semaphore.acquire(); - } - return BatchResult.builder().successes(tasks).build(maybeUpdatedClusterState); - } - - @Override - public boolean runOnlyOnMaster() { - return false; - } - - @Override - public void clusterStatePublished(ClusterState newClusterState) { - published.incrementAndGet(); - semaphore.release(); - } - } - - ConcurrentMap counters = new ConcurrentHashMap<>(); - CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); - ClusterStateTaskListener listener = new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Throwable t) { - assert false; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - counters.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet(); - updateLatch.countDown(); - } - }; - - List executors = new ArrayList<>(); - for (int i = 0; i < numberOfExecutors; i++) { - executors.add(new TaskExecutor()); - } - - // randomly assign tasks to executors - List assignments = new ArrayList<>(); - for (int i = 0; i < numberOfThreads; i++) { - for (int j = 0; j < tasksSubmittedPerThread; j++) { - assignments.add(randomFrom(executors)); - } - } - - Map counts = new HashMap<>(); - for (TaskExecutor executor : assignments) { - counts.merge(executor, 1, (previous, one) -> previous + one); - } - - CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); - for (int i = 0; i < numberOfThreads; i++) { - final int index = i; - Thread thread = new Thread(() -> { - try { - barrier.await(); - for (int j = 0; j < tasksSubmittedPerThread; j++) { - ClusterStateTaskExecutor executor = assignments.get(index * tasksSubmittedPerThread + j); - clusterService.submitStateUpdateTask( - Thread.currentThread().getName(), - new Task(), - ClusterStateTaskConfig.build(randomFrom(Priority.values())), - executor, - listener); - } - barrier.await(); - } catch (BrokenBarrierException | InterruptedException e) { - throw new AssertionError(e); - } - }); - thread.start(); - } - - // wait for all threads to be ready - barrier.await(); - // wait for all threads to finish - barrier.await(); - - // wait until all the cluster state updates have been processed - updateLatch.await(); - // and until all of the publication callbacks have completed - semaphore.acquire(numberOfExecutors); - - // assert the number of executed tasks is correct - assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get()); - - // assert each executor executed the correct number of tasks - for (TaskExecutor executor : executors) { - if (counts.containsKey(executor)) { - assertEquals((int) counts.get(executor), executor.counter.get()); - assertEquals(executor.batches.get(), executor.published.get()); - } - } - - // assert the correct number of clusterStateProcessed events were triggered - for (Map.Entry entry : counters.entrySet()) { - assertEquals(entry.getValue().get(), tasksSubmittedPerThread); - } - } - - @TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level - public void testClusterStateUpdateLogging() throws Exception { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .build(); - internalCluster().startNode(settings); - ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class); - MockLogAppender mockAppender = new MockLogAppender(); - mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", "cluster.service", Level.DEBUG, "*processing [test1]: took * no change in cluster_state")); - mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.TRACE, "*failed to execute cluster state update in *")); - mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.DEBUG, "*processing [test3]: took * done applying updated cluster_state (version: *, uuid: *)")); - - Logger rootLogger = Logger.getRootLogger(); - rootLogger.addAppender(mockAppender); - try { - final CountDownLatch latch = new CountDownLatch(4); - clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return currentState; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Throwable t) { - fail(); - } - }); - clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - fail(); - } - - @Override - public void onFailure(String source, Throwable t) { - latch.countDown(); - } - }); - clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return ClusterState.builder(currentState).incrementVersion().build(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Throwable t) { - fail(); - } - }); - // Additional update task to make sure all previous logging made it to the logger - // We don't check logging for this on since there is no guarantee that it will occur before our check - clusterService1.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return currentState; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Throwable t) { - fail(); - } - }); - assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true)); - } finally { - rootLogger.removeAppender(mockAppender); - } - mockAppender.assertAllExpectationsMatched(); - } - - @TestLogging("cluster:WARN") // To ensure that we log cluster state events on WARN level - public void testLongClusterStateUpdateLogging() throws Exception { - Settings settings = settingsBuilder() - .put("discovery.type", "local") - .put(InternalClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.getKey(), "10s") - .build(); - internalCluster().startNode(settings); - ClusterService clusterService1 = internalCluster().getInstance(ClusterService.class); - MockLogAppender mockAppender = new MockLogAppender(); - mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test1 shouldn't see because setting is too low", "cluster.service", Level.WARN, "*cluster state update task [test1] took * above the warn threshold of *")); - mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.WARN, "*cluster state update task [test2] took * above the warn threshold of 10ms")); - mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.WARN, "*cluster state update task [test3] took * above the warn threshold of 10ms")); - mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", "cluster.service", Level.WARN, "*cluster state update task [test4] took * above the warn threshold of 10ms")); - - Logger rootLogger = Logger.getRootLogger(); - rootLogger.addAppender(mockAppender); - try { - final CountDownLatch latch = new CountDownLatch(5); - final CountDownLatch processedFirstTask = new CountDownLatch(1); - clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Thread.sleep(100); - return currentState; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - processedFirstTask.countDown(); - } - - @Override - public void onFailure(String source, Throwable t) { - fail(); - } - }); - - processedFirstTask.await(1, TimeUnit.SECONDS); - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder() - .put(InternalClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.getKey(), "10ms"))); - - clusterService1.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Thread.sleep(100); - throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - fail(); - } - - @Override - public void onFailure(String source, Throwable t) { - latch.countDown(); - } - }); - clusterService1.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Thread.sleep(100); - return ClusterState.builder(currentState).incrementVersion().build(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Throwable t) { - fail(); - } - }); - clusterService1.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Thread.sleep(100); - return currentState; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Throwable t) { - fail(); - } - }); - // Additional update task to make sure all previous logging made it to the logger - // We don't check logging for this on since there is no guarantee that it will occur before our check - clusterService1.submitStateUpdateTask("test5", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return currentState; - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Throwable t) { - fail(); - } - }); - assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true)); - } finally { - rootLogger.removeAppender(mockAppender); - } - mockAppender.assertAllExpectationsMatched(); - } - - private static class BlockingTask extends ClusterStateUpdateTask { - private final CountDownLatch latch = new CountDownLatch(1); - - public BlockingTask(Priority priority) { - super(priority); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - latch.await(); - return currentState; - } - - @Override - public void onFailure(String source, Throwable t) { - } - - public void release() { - latch.countDown(); - } - - } - - private static class PrioritizedTask extends ClusterStateUpdateTask { - - private final CountDownLatch latch; - private final List tasks; - - private PrioritizedTask(Priority priority, CountDownLatch latch, List tasks) { - super(priority); - this.latch = latch; - this.tasks = tasks; - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - tasks.add(this); - latch.countDown(); - return currentState; - } - - @Override - public void onFailure(String source, Throwable t) { - latch.countDown(); - } - } - public static class TestPlugin extends Plugin { @Override diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java new file mode 100644 index 00000000000..84c9e9f07a0 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -0,0 +1,275 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.TransportServiceAdapter; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; + +public class NodeConnectionsServiceTests extends ESTestCase { + + private static ThreadPool THREAD_POOL; + private MockTransport transport; + private TransportService transportService; + + private List generateNodes() { + List nodes = new ArrayList<>(); + for (int i = randomIntBetween(20, 50); i > 0; i--) { + final HashMap attributes = new HashMap<>(); + if (rarely()) { + attributes.put("client", "true"); + } else { + attributes.put("master", "" + randomBoolean()); + attributes.put("data", "" + randomBoolean()); + attributes.put("ingest", "" + randomBoolean()); + } + nodes.add(new DiscoveryNode("node_" + i, "" + i, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT)); + } + return nodes; + } + + private ClusterState clusterStateFromNodes(List nodes) { + final DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (DiscoveryNode node : nodes) { + builder.put(node); + } + return ClusterState.builder(new ClusterName("test")).nodes(builder).build(); + } + + public void testConnectAndDisconnect() { + List nodes = generateNodes(); + NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService); + + ClusterState current = clusterStateFromNodes(Collections.emptyList()); + ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); + + service.connectToAddedNodes(event); + assertConnected(event.nodesDelta().addedNodes()); + + service.disconnectFromRemovedNodes(event); + assertConnectedExactlyToNodes(event.state()); + + current = event.state(); + event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); + + service.connectToAddedNodes(event); + assertConnected(event.nodesDelta().addedNodes()); + + service.disconnectFromRemovedNodes(event); + assertConnectedExactlyToNodes(event.state()); + } + + + public void testReconnect() { + List nodes = generateNodes(); + NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, THREAD_POOL, transportService); + + ClusterState current = clusterStateFromNodes(Collections.emptyList()); + ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); + + transport.randomConnectionExceptions = true; + + service.connectToAddedNodes(event); + + for (int i = 0; i < 3; i++) { + // simulate disconnects + for (DiscoveryNode node : randomSubsetOf(nodes)) { + transport.disconnectFromNode(node); + } + service.new ConnectionChecker().run(); + } + + // disable exceptions so things can be restored + transport.randomConnectionExceptions = false; + service.new ConnectionChecker().run(); + assertConnectedExactlyToNodes(event.state()); + } + + private void assertConnectedExactlyToNodes(ClusterState state) { + assertConnected(state.nodes()); + assertThat(transport.connectedNodes.size(), equalTo(state.nodes().size())); + } + + private void assertConnected(Iterable nodes) { + for (DiscoveryNode node : nodes) { + assertTrue("not connected to " + node, transport.connectedNodes.contains(node)); + } + } + + private void assertNotConnected(Iterable nodes) { + for (DiscoveryNode node : nodes) { + assertFalse("still connected to " + node, transport.connectedNodes.contains(node)); + } + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.transport = new MockTransport(); + transportService = new TransportService(transport, THREAD_POOL); + transportService.start(); + transportService.acceptIncomingRequests(); + } + + @Override + @After + public void tearDown() throws Exception { + transportService.stop(); + super.tearDown(); + } + + @AfterClass + public static void stopThreadPool() { + ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); + THREAD_POOL = null; + } + + + final class MockTransport implements Transport { + + Set connectedNodes = ConcurrentCollections.newConcurrentSet(); + volatile boolean randomConnectionExceptions = false; + + @Override + public void transportServiceAdapter(TransportServiceAdapter service) { + + } + + @Override + public BoundTransportAddress boundAddress() { + return null; + } + + @Override + public Map profileBoundAddresses() { + return null; + } + + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + return new TransportAddress[0]; + } + + @Override + public boolean addressSupported(Class address) { + return false; + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return connectedNodes.contains(node); + } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + if (connectedNodes.contains(node) == false && randomConnectionExceptions && randomBoolean()) { + throw new ConnectTransportException(node, "simulated"); + } + connectedNodes.add(node); + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + + } + + @Override + public void disconnectFromNode(DiscoveryNode node) { + connectedNodes.remove(node); + } + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { + + } + + @Override + public long serverOpen() { + return 0; + } + + @Override + public List getLocalAddresses() { + return null; + } + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + + } + + @Override + public Transport start() { + return null; + } + + @Override + public Transport stop() { + return null; + } + + @Override + public void close() { + + } + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 6339c700eec..29ce8e7a636 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -40,7 +41,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; -import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESAllocationTestCase; @@ -305,7 +305,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa return randomSubsetOf(1, shards.toArray(new ShardRouting[0])).get(0); } else { return - TestShardRouting.newShardRouting(shardRouting.index(), shardRouting.id(), InternalClusterService.generateNodeId(Settings.EMPTY), randomBoolean(), randomFrom(ShardRoutingState.values())); + TestShardRouting.newShardRouting(shardRouting.index(), shardRouting.id(), DiscoveryNodeService.generateNodeId(Settings.EMPTY), randomBoolean(), randomFrom(ShardRoutingState.values())); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java new file mode 100644 index 00000000000..ff55de45649 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -0,0 +1,824 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.service; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.NodeConnectionsService; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class ClusterServiceTests extends ESTestCase { + + static ThreadPool threadPool; + TimedClusterService clusterService; + + @BeforeClass + public static void createThreadPool() { + threadPool = new ThreadPool(ClusterServiceTests.class.getName()); + } + + @AfterClass + public static void stopThreadPool() { + if (threadPool != null) { + threadPool.shutdownNow(); + threadPool = null; + } + } + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = createClusterService(true); + } + + @After + public void tearDown() throws Exception { + clusterService.close(); + super.tearDown(); + } + + TimedClusterService createClusterService(boolean makeMaster) throws InterruptedException { + TimedClusterService test = new TimedClusterService(Settings.EMPTY, null, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool, new ClusterName("ClusterServiceTests")); + test.setLocalNode(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT)); + test.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { + @Override + public void connectToAddedNodes(ClusterChangedEvent event) { + // skip + } + + @Override + public void disconnectFromRemovedNodes(ClusterChangedEvent event) { + // skip + } + }); + test.setClusterStatePublisher((event, ackListener) -> { + }); + test.start(); + CountDownLatch latch = new CountDownLatch(1); + test.submitStateUpdateTask("making a master", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + final DiscoveryNodes nodes = currentState.nodes(); + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(nodes) + .masterNodeId(makeMaster ? nodes.localNodeId() : null); + return ClusterState.builder(currentState).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).nodes(nodesBuilder).build(); + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + logger.warn("unexpected exception", t); + fail("unexpected exception" + t); + } + }); + latch.await(); + return test; + } + + public void testTimeoutUpdateTask() throws Exception { + final CountDownLatch block = new CountDownLatch(1); + clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + block.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + throw new RuntimeException(t); + } + }); + + final CountDownLatch timedOut = new CountDownLatch(1); + final AtomicBoolean executeCalled = new AtomicBoolean(); + clusterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + @Override + public TimeValue timeout() { + return TimeValue.timeValueMillis(2); + } + + @Override + public void onFailure(String source, Throwable t) { + timedOut.countDown(); + } + + @Override + public ClusterState execute(ClusterState currentState) { + executeCalled.set(true); + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + } + }); + + timedOut.await(); + block.countDown(); + final CountDownLatch allProcessed = new CountDownLatch(1); + clusterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + @Override + public void onFailure(String source, Throwable t) { + throw new RuntimeException(t); + } + + @Override + public ClusterState execute(ClusterState currentState) { + allProcessed.countDown(); + return currentState; + } + + }); + allProcessed.await(); // executed another task to double check that execute on the timed out update task is not called... + assertThat(executeCalled.get(), equalTo(false)); + } + + + public void testMasterAwareExecution() throws Exception { + ClusterService nonMaster = createClusterService(false); + + final boolean[] taskFailed = {false}; + final CountDownLatch latch1 = new CountDownLatch(1); + nonMaster.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + latch1.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + taskFailed[0] = true; + latch1.countDown(); + } + }); + + latch1.await(); + assertTrue("cluster state update task was executed on a non-master", taskFailed[0]); + + taskFailed[0] = true; + final CountDownLatch latch2 = new CountDownLatch(1); + nonMaster.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + taskFailed[0] = false; + latch2.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + taskFailed[0] = true; + latch2.countDown(); + } + }); + latch2.await(); + assertFalse("non-master cluster state update task was not executed", taskFailed[0]); + + nonMaster.close(); + } + + /* + * test that a listener throwing an exception while handling a + * notification does not prevent publication notification to the + * executor + */ + public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean published = new AtomicBoolean(); + + clusterService.submitStateUpdateTask( + "testClusterStateTaskListenerThrowingExceptionIsOkay", + new Object(), + ClusterStateTaskConfig.build(Priority.NORMAL), + new ClusterStateTaskExecutor() { + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + ClusterState newClusterState = ClusterState.builder(currentState).build(); + return BatchResult.builder().successes(tasks).build(newClusterState); + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + published.set(true); + latch.countDown(); + } + }, + new ClusterStateTaskListener() { + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + throw new IllegalStateException(source); + } + + @Override + public void onFailure(String source, Throwable t) { + } + } + ); + + latch.await(); + assertTrue(published.get()); + } + + // test that for a single thread, tasks are executed in the order + // that they are submitted + public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException { + class TaskExecutor implements ClusterStateTaskExecutor { + List tasks = new ArrayList<>(); + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + this.tasks.addAll(tasks); + return BatchResult.builder().successes(tasks).build(ClusterState.builder(currentState).build()); + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + } + + int numberOfThreads = randomIntBetween(2, 8); + TaskExecutor[] executors = new TaskExecutor[numberOfThreads]; + for (int i = 0; i < numberOfThreads; i++) { + executors[i] = new TaskExecutor(); + } + + int tasksSubmittedPerThread = randomIntBetween(2, 1024); + + CopyOnWriteArrayList> failures = new CopyOnWriteArrayList<>(); + CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); + + ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure: [{}]", t, source); + failures.add(new Tuple<>(source, t)); + updateLatch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + updateLatch.countDown(); + } + }; + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + Thread thread = new Thread(() -> { + try { + barrier.await(); + for (int j = 0; j < tasksSubmittedPerThread; j++) { + clusterService.submitStateUpdateTask("[" + index + "][" + j + "]", j, + ClusterStateTaskConfig.build(randomFrom(Priority.values())), executors[index], listener); + } + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + }); + thread.start(); + } + + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); + + updateLatch.await(); + + assertThat(failures, empty()); + + for (int i = 0; i < numberOfThreads; i++) { + assertEquals(tasksSubmittedPerThread, executors[i].tasks.size()); + for (int j = 0; j < tasksSubmittedPerThread; j++) { + assertNotNull(executors[i].tasks.get(j)); + assertEquals("cluster state update task executed out of order", j, (int) executors[i].tasks.get(j)); + } + } + } + + public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException { + AtomicInteger counter = new AtomicInteger(); + class Task { + private AtomicBoolean state = new AtomicBoolean(); + + public void execute() { + if (!state.compareAndSet(false, true)) { + throw new IllegalStateException(); + } else { + counter.incrementAndGet(); + } + } + } + + int numberOfThreads = randomIntBetween(2, 8); + int tasksSubmittedPerThread = randomIntBetween(1, 1024); + int numberOfExecutors = Math.max(1, numberOfThreads / 4); + final Semaphore semaphore = new Semaphore(numberOfExecutors); + + class TaskExecutor implements ClusterStateTaskExecutor { + private AtomicInteger counter = new AtomicInteger(); + private AtomicInteger batches = new AtomicInteger(); + private AtomicInteger published = new AtomicInteger(); + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + tasks.forEach(task -> task.execute()); + counter.addAndGet(tasks.size()); + ClusterState maybeUpdatedClusterState = currentState; + if (randomBoolean()) { + maybeUpdatedClusterState = ClusterState.builder(currentState).build(); + batches.incrementAndGet(); + semaphore.acquire(); + } + return BatchResult.builder().successes(tasks).build(maybeUpdatedClusterState); + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + published.incrementAndGet(); + semaphore.release(); + } + } + + ConcurrentMap counters = new ConcurrentHashMap<>(); + CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread); + ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Throwable t) { + assert false; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + counters.computeIfAbsent(source, key -> new AtomicInteger()).incrementAndGet(); + updateLatch.countDown(); + } + }; + + List executors = new ArrayList<>(); + for (int i = 0; i < numberOfExecutors; i++) { + executors.add(new TaskExecutor()); + } + + // randomly assign tasks to executors + List assignments = new ArrayList<>(); + for (int i = 0; i < numberOfThreads; i++) { + for (int j = 0; j < tasksSubmittedPerThread; j++) { + assignments.add(randomFrom(executors)); + } + } + + Map counts = new HashMap<>(); + for (TaskExecutor executor : assignments) { + counts.merge(executor, 1, (previous, one) -> previous + one); + } + + CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + for (int i = 0; i < numberOfThreads; i++) { + final int index = i; + Thread thread = new Thread(() -> { + try { + barrier.await(); + for (int j = 0; j < tasksSubmittedPerThread; j++) { + ClusterStateTaskExecutor executor = assignments.get(index * tasksSubmittedPerThread + j); + clusterService.submitStateUpdateTask( + Thread.currentThread().getName(), + new Task(), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + listener); + } + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + }); + thread.start(); + } + + // wait for all threads to be ready + barrier.await(); + // wait for all threads to finish + barrier.await(); + + // wait until all the cluster state updates have been processed + updateLatch.await(); + // and until all of the publication callbacks have completed + semaphore.acquire(numberOfExecutors); + + // assert the number of executed tasks is correct + assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get()); + + // assert each executor executed the correct number of tasks + for (TaskExecutor executor : executors) { + if (counts.containsKey(executor)) { + assertEquals((int) counts.get(executor), executor.counter.get()); + assertEquals(executor.batches.get(), executor.published.get()); + } + } + + // assert the correct number of clusterStateProcessed events were triggered + for (Map.Entry entry : counters.entrySet()) { + assertEquals(entry.getValue().get(), tasksSubmittedPerThread); + } + } + + /** + * Note, this test can only work as long as we have a single thread executor executing the state update tasks! + */ + public void testPrioritizedTasks() throws Exception { + Settings settings = settingsBuilder() + .put("discovery.type", "local") + .build(); + BlockingTask block = new BlockingTask(Priority.IMMEDIATE); + clusterService.submitStateUpdateTask("test", block); + int taskCount = randomIntBetween(5, 20); + Priority[] priorities = Priority.values(); + + // will hold all the tasks in the order in which they were executed + List tasks = new ArrayList<>(taskCount); + CountDownLatch latch = new CountDownLatch(taskCount); + for (int i = 0; i < taskCount; i++) { + Priority priority = priorities[randomIntBetween(0, priorities.length - 1)]; + clusterService.submitStateUpdateTask("test", new PrioritizedTask(priority, latch, tasks)); + } + + block.release(); + latch.await(); + + Priority prevPriority = null; + for (PrioritizedTask task : tasks) { + if (prevPriority == null) { + prevPriority = task.priority(); + } else { + assertThat(task.priority().sameOrAfter(prevPriority), is(true)); + } + } + } + + @TestLogging("cluster:TRACE") // To ensure that we log cluster state events on TRACE level + public void testClusterStateUpdateLogging() throws Exception { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test1", "cluster.service", Level.DEBUG, + "*processing [test1]: took [1s] no change in cluster_state")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.TRACE, + "*failed to execute cluster state update in [2s]*")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.DEBUG, + "*processing [test3]: took [3s] done applying updated cluster_state (version: *, uuid: *)")); + + Logger rootLogger = Logger.getRootLogger(); + rootLogger.addAppender(mockAppender); + try { + final CountDownLatch latch = new CountDownLatch(4); + clusterService.currentTimeOverride = System.nanoTime(); + clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + clusterService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(); + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(); + } + }); + clusterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + clusterService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos(); + throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + fail(); + } + + @Override + public void onFailure(String source, Throwable t) { + latch.countDown(); + } + }); + clusterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + clusterService.currentTimeOverride += TimeValue.timeValueSeconds(3).nanos(); + return ClusterState.builder(currentState).incrementVersion().build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(); + } + }); + // Additional update task to make sure all previous logging made it to the logger + // We don't check logging for this on since there is no guarantee that it will occur before our check + clusterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(); + } + }); + latch.await(); + } finally { + rootLogger.removeAppender(mockAppender); + } + mockAppender.assertAllExpectationsMatched(); + } + + @TestLogging("cluster:WARN") // To ensure that we log cluster state events on WARN level + public void testLongClusterStateUpdateLogging() throws Exception { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation("test1 shouldn't see because setting is too low", + "cluster.service", Level.WARN, "*cluster state update task [test1] took [*] above the warn threshold of *")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test2", "cluster.service", Level.WARN, + "*cluster state update task [test2] took [32s] above the warn threshold of *")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test3", "cluster.service", Level.WARN, + "*cluster state update task [test3] took [33s] above the warn threshold of *")); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation("test4", "cluster.service", Level.WARN, + "*cluster state update task [test4] took [34s] above the warn threshold of *")); + + Logger rootLogger = Logger.getRootLogger(); + rootLogger.addAppender(mockAppender); + try { + final CountDownLatch latch = new CountDownLatch(5); + final CountDownLatch processedFirstTask = new CountDownLatch(1); + clusterService.currentTimeOverride = System.nanoTime(); + clusterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + clusterService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(); + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + processedFirstTask.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(); + } + }); + + processedFirstTask.await(); + clusterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + clusterService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos(); + throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + fail(); + } + + @Override + public void onFailure(String source, Throwable t) { + latch.countDown(); + } + }); + clusterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + clusterService.currentTimeOverride += TimeValue.timeValueSeconds(33).nanos(); + return ClusterState.builder(currentState).incrementVersion().build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(); + } + }); + clusterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + clusterService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(); + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(); + } + }); + // Additional update task to make sure all previous logging made it to the logger + // We don't check logging for this on since there is no guarantee that it will occur before our check + clusterService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Throwable t) { + fail(); + } + }); + latch.await(); + } finally { + rootLogger.removeAppender(mockAppender); + } + mockAppender.assertAllExpectationsMatched(); + } + + private static class BlockingTask extends ClusterStateUpdateTask { + private final CountDownLatch latch = new CountDownLatch(1); + + public BlockingTask(Priority priority) { + super(priority); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + latch.await(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + } + + public void release() { + latch.countDown(); + } + + } + + private static class PrioritizedTask extends ClusterStateUpdateTask { + + private final CountDownLatch latch; + private final List tasks; + + private PrioritizedTask(Priority priority, CountDownLatch latch, List tasks) { + super(priority); + this.latch = latch; + this.tasks = tasks; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + tasks.add(this); + latch.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + latch.countDown(); + } + } + + static class TimedClusterService extends InternalClusterService { + + public volatile Long currentTimeOverride = null; + + public TimedClusterService(Settings settings, OperationRouting operationRouting, ClusterSettings clusterSettings, + ThreadPool threadPool, ClusterName clusterName) { + super(settings, operationRouting, clusterSettings, threadPool, clusterName); + } + + @Override + protected long currentTimeInNanos() { + if (currentTimeOverride != null) { + return currentTimeOverride; + } + return super.currentTimeInNanos(); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/test/MockLogAppender.java b/core/src/test/java/org/elasticsearch/test/MockLogAppender.java index c0866a81081..9e4a881b25b 100644 --- a/core/src/test/java/org/elasticsearch/test/MockLogAppender.java +++ b/core/src/test/java/org/elasticsearch/test/MockLogAppender.java @@ -80,7 +80,7 @@ public class MockLogAppender extends AppenderSkeleton { protected final String logger; protected final Level level; protected final String message; - protected boolean saw; + volatile boolean saw; public AbstractEventExpectation(String name, String logger, Level level, String message) { this.name = name; diff --git a/core/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java b/core/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java index 9581dfff42f..f9451375590 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -29,9 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; public class KeyedLockTests extends ESTestCase { @@ -68,28 +67,6 @@ public class KeyedLockTests extends ESTestCase { } } - public void testCannotAcquireTwoLocks() throws InterruptedException { - KeyedLock connectionLock = new KeyedLock(); - String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); - connectionLock.acquire(name); - try { - connectionLock.acquire(name); - fail("Expected IllegalStateException"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), containsString("Lock already acquired")); - } - } - - public void testCannotReleaseUnacquiredLock() throws InterruptedException { - KeyedLock connectionLock = new KeyedLock(); - String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); - try { - connectionLock.release(name); - fail("Expected IllegalStateException"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), is("Lock not acquired")); - } - } public static class AcquireAndReleaseThread extends Thread { private CountDownLatch startLatch; @@ -117,16 +94,16 @@ public class KeyedLockTests extends ESTestCase { int numRuns = scaledRandomIntBetween(5000, 50000); for (int i = 0; i < numRuns; i++) { String curName = names[randomInt(names.length - 1)]; - connectionLock.acquire(curName); - try { + assert connectionLock.isHeldByCurrentThread(curName) == false; + try (Releasable ignored = connectionLock.acquire(curName)) { + assert connectionLock.isHeldByCurrentThread(curName); + assert connectionLock.isHeldByCurrentThread(curName + "bla") == false; Integer integer = counter.get(curName); if (integer == null) { counter.put(curName, 1); } else { counter.put(curName, integer.intValue() + 1); } - } finally { - connectionLock.release(curName); } AtomicInteger atomicInteger = new AtomicInteger(0); AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger); diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java index 9c68ea196aa..63c09890acc 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java @@ -23,7 +23,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.InternalClusterService; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; @@ -66,14 +66,14 @@ public class TribeUnitTests extends ESTestCase { .put(baseSettings) .put("cluster.name", "tribe1") .put("node.name", "tribe1_node") - .put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) + .put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) .build()).start(); tribe2 = new TribeClientNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe2") .put("node.name", "tribe2_node") - .put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) + .put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) .build()).start(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 84d88733802..c2cb12644b0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -67,6 +67,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; @@ -282,10 +283,10 @@ public abstract class ESTestCase extends LuceneTestCase { * Returns a double value in the interval [start, end) if lowerInclusive is * set to true, (start, end) otherwise. * - * @param start lower bound of interval to draw uniformly distributed random numbers from - * @param end upper bound + * @param start lower bound of interval to draw uniformly distributed random numbers from + * @param end upper bound * @param lowerInclusive whether or not to include lower end of the interval - * */ + */ public static double randomDoubleBetween(double start, double end, boolean lowerInclusive) { double result = 0.0; @@ -555,12 +556,27 @@ public abstract class ESTestCase extends LuceneTestCase { * Returns size random values */ public static List randomSubsetOf(int size, T... values) { - if (size > values.length) { - throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a list of " + values.length + " objects"); - } List list = arrayAsArrayList(values); - Collections.shuffle(list, random()); - return list.subList(0, size); + return randomSubsetOf(size, list); + } + + /** + * Returns a random subset of values (including a potential empty list) + */ + public static List randomSubsetOf(Collection collection) { + return randomSubsetOf(randomInt(collection.size() - 1), collection); + } + + /** + * Returns size random values + */ + public static List randomSubsetOf(int size, Collection collection) { + if (size > collection.size()) { + throw new IllegalArgumentException("Can\'t pick " + size + " random objects from a collection of " + collection.size() + " objects"); + } + List tempList = new ArrayList<>(collection); + Collections.shuffle(tempList, random()); + return tempList.subList(0, size); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 82c7db11d69..43483f17117 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -39,13 +39,13 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; -import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -591,7 +591,7 @@ public final class InternalTestCluster extends TestCluster { .put(Environment.PATH_HOME_SETTING.getKey(), baseDir) // allow overriding path.home .put(settings) .put("node.name", name) - .put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), seed) + .put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), seed) .build(); MockNode node = new MockNode(finalSettings, version, plugins); return new NodeAndClient(name, node); @@ -838,8 +838,8 @@ public final class InternalTestCluster extends TestCluster { IOUtils.rm(nodeEnv.nodeDataPaths()); } } - final long newIdSeed = InternalClusterService.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id - Settings finalSettings = Settings.builder().put(node.settings()).put(newSettings).put(InternalClusterService.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build(); + final long newIdSeed = DiscoveryNodeService.NODE_ID_SEED_SETTING.get(node.settings()) + 1; // use a new seed to make sure we have new node id + Settings finalSettings = Settings.builder().put(node.settings()).put(newSettings).put(DiscoveryNodeService.NODE_ID_SEED_SETTING.getKey(), newIdSeed).build(); Collection> plugins = node.getPlugins(); Version version = node.getVersion(); node = new MockNode(finalSettings, version, plugins); diff --git a/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java b/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java index 99ba809c144..ad73a097c1e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/cluster/NoopClusterService.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.tasks.TaskManager; import java.util.List; @@ -153,11 +152,6 @@ public class NoopClusterService implements ClusterService { return TimeValue.timeValueMillis(0); } - @Override - public TaskManager getTaskManager() { - return null; - } - @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java b/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java index 3b1082cae44..ebae5cc9947 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -47,9 +47,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; import java.util.Arrays; import java.util.Iterator; @@ -62,7 +60,6 @@ import java.util.concurrent.ScheduledFuture; public class TestClusterService implements ClusterService { volatile ClusterState state; - private volatile TaskManager taskManager; private final List listeners = new CopyOnWriteArrayList<>(); private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); private final ThreadPool threadPool; @@ -75,12 +72,6 @@ public class TestClusterService implements ClusterService { public TestClusterService(ThreadPool threadPool) { this(ClusterState.builder(new ClusterName("test")).build(), threadPool); - taskManager = new TaskManager(Settings.EMPTY); - } - - public TestClusterService(ThreadPool threadPool, TransportService transportService) { - this(ClusterState.builder(new ClusterName("test")).build(), threadPool); - taskManager = transportService.getTaskManager(); } public TestClusterService(ClusterState state) { @@ -243,11 +234,6 @@ public class TestClusterService implements ClusterService { throw new UnsupportedOperationException(); } - @Override - public TaskManager getTaskManager() { - return taskManager; - } - @Override public List pendingTasks() { throw new UnsupportedOperationException();