Simplify and Fix Synchronization in InternalTestCluster (#39168) (#39241)

* Simplify and Fix Synchronization in InternalTestCluster (#39168)

* Remove unnecessary `synchronized` statements
* Make `Predicate`s constants where possible
* Cleanup some stream usage
* Make unsafe public methods `synchronized`
* Closes #37965
* Closes #37275
* Closes #37345
This commit is contained in:
Armin Braun 2019-02-21 16:27:18 +01:00 committed by GitHub
parent d9de899316
commit 1a21cc0357
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -111,7 +111,6 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.junit.Assert;
import java.io.Closeable;
import java.io.IOException;
@ -187,6 +186,16 @@ public final class InternalTestCluster extends TestCluster {
private final Logger logger = LogManager.getLogger(getClass());
private static final Predicate<NodeAndClient> DATA_NODE_PREDICATE =
nodeAndClient -> DiscoveryNode.isDataNode(nodeAndClient.node.settings());
private static final Predicate<NodeAndClient> NO_DATA_NO_MASTER_PREDICATE = nodeAndClient ->
DiscoveryNode.isMasterNode(nodeAndClient.node.settings()) == false
&& DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false;
private static final Predicate<NodeAndClient> MASTER_NODE_PREDICATE =
nodeAndClient -> DiscoveryNode.isMasterNode(nodeAndClient.node.settings());
public static final int DEFAULT_LOW_NUM_MASTER_NODES = 1;
public static final int DEFAULT_HIGH_NUM_MASTER_NODES = 3;
@ -197,8 +206,10 @@ public final class InternalTestCluster extends TestCluster {
static final int DEFAULT_MIN_NUM_CLIENT_NODES = 0;
static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1;
/* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */
private final NavigableMap<String, NodeAndClient> nodes = new TreeMap<>();
/* Sorted map to make traverse order reproducible.
* The map of nodes is never mutated so individual reads are safe without synchronization.
* Updates are intended to follow a copy-on-write approach. */
private volatile NavigableMap<String, NodeAndClient> nodes = Collections.emptyNavigableMap();
private final Set<Path> dataDirToClean = new HashSet<>();
@ -208,7 +219,7 @@ public final class InternalTestCluster extends TestCluster {
private final Settings defaultSettings;
private AtomicInteger nextNodeId = new AtomicInteger(0);
private final AtomicInteger nextNodeId = new AtomicInteger(0);
/* Each shared node has a node seed that is used to start up the node and get default settings
* this is important if a node is randomly shut down in a test since the next test relies on a
@ -240,7 +251,7 @@ public final class InternalTestCluster extends TestCluster {
private final Path baseDir;
private ServiceDisruptionScheme activeDisruptionScheme;
private Function<Client, Client> clientWrapper;
private final Function<Client, Client> clientWrapper;
private int bootstrapMasterNodeIndex = -1;
@ -405,10 +416,6 @@ public final class InternalTestCluster extends TestCluster {
return ZEN_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings));
}
public int getBootstrapMasterNodeIndex() {
return bootstrapMasterNodeIndex;
}
/**
* Sets {@link #bootstrapMasterNodeIndex} to the given value, see {@link #bootstrapMasterNodeWithSpecifiedIndex(List)}
* for the description of how this field is used.
@ -460,7 +467,7 @@ public final class InternalTestCluster extends TestCluster {
return plugins;
}
private Settings getRandomNodeSettings(long seed) {
private static Settings getRandomNodeSettings(long seed) {
Random random = new Random(seed);
Builder builder = Settings.builder();
builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), rarely(random));
@ -545,8 +552,8 @@ public final class InternalTestCluster extends TestCluster {
}
}
private synchronized NodeAndClient getOrBuildRandomNode() {
ensureOpen();
private NodeAndClient getOrBuildRandomNode() {
assert Thread.holdsLock(this);
final NodeAndClient randomNodeAndClient = getRandomNodeAndClient();
if (randomNodeAndClient != null) {
return randomNodeAndClient;
@ -566,11 +573,10 @@ public final class InternalTestCluster extends TestCluster {
return buildNode;
}
private synchronized NodeAndClient getRandomNodeAndClient() {
private NodeAndClient getRandomNodeAndClient() {
return getRandomNodeAndClient(nc -> true);
}
private synchronized NodeAndClient getRandomNodeAndClient(Predicate<NodeAndClient> predicate) {
ensureOpen();
List<NodeAndClient> values = nodes.values().stream().filter(predicate).collect(Collectors.toList());
@ -612,7 +618,7 @@ public final class InternalTestCluster extends TestCluster {
final Stream<NodeAndClient> collection = n == 0
? nodes.values().stream()
: nodes.values().stream()
.filter(new DataNodePredicate().and(new NodeNamePredicate(getMasterName()).negate()));
.filter(DATA_NODE_PREDICATE.and(new NodeNamePredicate(getMasterName()).negate()));
final Iterator<NodeAndClient> values = collection.iterator();
logger.info("changing cluster size from {} data nodes to {}", size, n);
@ -676,20 +682,19 @@ public final class InternalTestCluster extends TestCluster {
* the method will return the existing one
* @param onTransportServiceStarted callback to run when transport service is started
*/
private NodeAndClient buildNode(int nodeId, Settings settings,
private synchronized NodeAndClient buildNode(int nodeId, Settings settings,
boolean reuseExisting, Runnable onTransportServiceStarted) {
assert Thread.holdsLock(this);
ensureOpen();
Collection<Class<? extends Plugin>> plugins = getPlugins();
String name = settings.get("node.name");
if (reuseExisting && nodes.containsKey(name)) {
final NodeAndClient nodeAndClient = nodes.get(name);
if (reuseExisting && nodeAndClient != null) {
onTransportServiceStarted.run(); // reusing an existing node implies its transport service already started
return nodes.get(name);
} else {
assert reuseExisting == true || nodes.containsKey(name) == false :
"node name [" + name + "] already exists but not allowed to use it";
return nodeAndClient;
}
assert reuseExisting == true || nodeAndClient == null : "node name [" + name + "] already exists but not allowed to use it";
SecureSettings secureSettings = Settings.builder().put(settings).getSecureSettings();
if (secureSettings instanceof MockSecureSettings) {
@ -726,7 +731,7 @@ public final class InternalTestCluster extends TestCluster {
/**
* returns a suffix string based on the node role. If no explicit role is defined, the suffix will be empty
*/
private String getRoleSuffix(Settings settings) {
private static String getRoleSuffix(Settings settings) {
String suffix = "";
if (Node.NODE_MASTER_SETTING.exists(settings) && Node.NODE_MASTER_SETTING.get(settings)) {
suffix = suffix + Role.MASTER.getAbbreviation();
@ -753,37 +758,32 @@ public final class InternalTestCluster extends TestCluster {
* Returns a node client to a data node in the cluster.
* Note: use this with care tests should not rely on a certain nodes client.
*/
public synchronized Client dataNodeClient() {
ensureOpen();
public Client dataNodeClient() {
/* Randomly return a client to one of the nodes in the cluster */
return getRandomNodeAndClient(new DataNodePredicate()).client(random);
return getRandomNodeAndClient(DATA_NODE_PREDICATE).client(random);
}
/**
* Returns a node client to the current master node.
* Note: use this with care tests should not rely on a certain nodes client.
*/
public synchronized Client masterClient() {
ensureOpen();
public Client masterClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName()));
if (randomNodeAndClient != null) {
return randomNodeAndClient.nodeClient(); // ensure node client master is requested
}
Assert.fail("No master client found");
return null; // can't happen
throw new AssertionError("No master client found");
}
/**
* Returns a node client to random node but not the master. This method will fail if no non-master client is available.
*/
public synchronized Client nonMasterClient() {
ensureOpen();
public Client nonMasterClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NodeNamePredicate(getMasterName()).negate());
if (randomNodeAndClient != null) {
return randomNodeAndClient.nodeClient(); // ensure node client non-master is requested
}
Assert.fail("No non-master client found");
return null; // can't happen
throw new AssertionError("No non-master client found");
}
/**
@ -791,14 +791,14 @@ public final class InternalTestCluster extends TestCluster {
*/
public synchronized Client coordOnlyNodeClient() {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(new NoDataNoMasterNodePredicate());
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE);
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
}
int nodeId = nextNodeId.getAndIncrement();
Settings settings = getSettings(nodeId, random.nextLong(), Settings.EMPTY);
startCoordinatingOnlyNode(settings);
return getRandomNodeAndClient(new NoDataNoMasterNodePredicate()).client(random);
return getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE).client(random);
}
public synchronized String startCoordinatingOnlyNode(Settings settings) {
@ -812,7 +812,6 @@ public final class InternalTestCluster extends TestCluster {
* Returns a transport client
*/
public synchronized Client transportClient() {
ensureOpen();
// randomly return a transport client going to one of the nodes in the cluster
return getOrBuildRandomNode().transportClient();
}
@ -820,27 +819,24 @@ public final class InternalTestCluster extends TestCluster {
/**
* Returns a node client to a given node.
*/
public synchronized Client client(String nodeName) {
ensureOpen();
public Client client(String nodeName) {
NodeAndClient nodeAndClient = nodes.get(nodeName);
if (nodeAndClient != null) {
return nodeAndClient.client(random);
}
Assert.fail("No node found with name: [" + nodeName + "]");
return null; // can't happen
throw new AssertionError("No node found with name: [" + nodeName + "]");
}
/**
* Returns a "smart" node client to a random node in the cluster
*/
public synchronized Client smartClient() {
public Client smartClient() {
NodeAndClient randomNodeAndClient = getRandomNodeAndClient();
if (randomNodeAndClient != null) {
return randomNodeAndClient.nodeClient();
}
Assert.fail("No smart client found");
return null; // can't happen
throw new AssertionError("No smart client found");
}
@Override
@ -853,7 +849,7 @@ public final class InternalTestCluster extends TestCluster {
try {
IOUtils.close(nodes.values());
} finally {
nodes.clear();
nodes = Collections.emptyNavigableMap();
executor.shutdownNow();
}
}
@ -875,7 +871,7 @@ public final class InternalTestCluster extends TestCluster {
this.name = name;
this.originalNodeSettings = originalNodeSettings;
this.nodeAndClientId = nodeAndClientId;
markNodeDataDirsAsNotEligableForWipe(node);
markNodeDataDirsAsNotEligibleForWipe(node);
}
Node node() {
@ -898,9 +894,6 @@ public final class InternalTestCluster extends TestCluster {
}
Client client(Random random) {
if (closed.get()) {
throw new RuntimeException("already closed");
}
double nextDouble = random.nextDouble();
if (nextDouble < transportClientRatio) {
if (logger.isTraceEnabled()) {
@ -927,22 +920,32 @@ public final class InternalTestCluster extends TestCluster {
}
private Client getOrBuildNodeClient() {
if (nodeClient == null) {
nodeClient = node.client();
synchronized (InternalTestCluster.this) {
if (closed.get()) {
throw new RuntimeException("already closed");
}
if (nodeClient == null) {
nodeClient = node.client();
}
return clientWrapper.apply(nodeClient);
}
return clientWrapper.apply(nodeClient);
}
private Client getOrBuildTransportClient() {
if (transportClient == null) {
/* don't sniff client for now - doesn't work will all tests
* since it might throw NoNodeAvailableException if nodes are
* shut down. we first need support of transportClientRatio
* as annotations or so */
transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(),
synchronized (InternalTestCluster.this) {
if (closed.get()) {
throw new RuntimeException("already closed");
}
if (transportClient == null) {
/* don't sniff client for now - doesn't work will all tests
* since it might throw NoNodeAvailableException if nodes are
* shut down. we first need support of transportClientRatio
* as annotations or so */
transportClient = new TransportClientFactory(nodeConfigurationSource.transportClientSettings(),
baseDir, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName);
}
return clientWrapper.apply(transportClient);
}
return clientWrapper.apply(transportClient);
}
void resetClient() {
@ -1034,11 +1037,12 @@ public final class InternalTestCluster extends TestCluster {
}
});
closed.set(false);
markNodeDataDirsAsNotEligableForWipe(node);
markNodeDataDirsAsNotEligibleForWipe(node);
}
@Override
public void close() throws IOException {
assert Thread.holdsLock(InternalTestCluster.this);
try {
resetClient();
} finally {
@ -1047,18 +1051,32 @@ public final class InternalTestCluster extends TestCluster {
node.close();
}
}
private void markNodeDataDirsAsPendingForWipe(Node node) {
assert Thread.holdsLock(InternalTestCluster.this);
NodeEnvironment nodeEnv = node.getNodeEnvironment();
if (nodeEnv.hasNodeFile()) {
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataPaths()));
}
}
private void markNodeDataDirsAsNotEligibleForWipe(Node node) {
assert Thread.holdsLock(InternalTestCluster.this);
NodeEnvironment nodeEnv = node.getNodeEnvironment();
if (nodeEnv.hasNodeFile()) {
dataDirToClean.removeAll(Arrays.asList(nodeEnv.nodeDataPaths()));
}
}
}
public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_";
static class TransportClientFactory {
private final boolean sniff;
private static class TransportClientFactory {
private final Settings settings;
private final Path baseDir;
private final Collection<Class<? extends Plugin>> plugins;
TransportClientFactory(boolean sniff, Settings settings, Path baseDir, Collection<Class<? extends Plugin>> plugins) {
this.sniff = sniff;
TransportClientFactory(Settings settings, Path baseDir, Collection<Class<? extends Plugin>> plugins) {
this.settings = settings != null ? settings : Settings.EMPTY;
this.baseDir = baseDir;
this.plugins = plugins;
@ -1071,7 +1089,7 @@ public final class InternalTestCluster extends TestCluster {
.put("client.transport.nodes_sampler_interval", "1s")
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir)
.put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name"))
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", sniff)
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", false)
.put("logger.prefix", nodeSettings.get("logger.prefix", ""))
.put("logger.level", nodeSettings.get("logger.level", "INFO"))
.put(settings);
@ -1120,8 +1138,7 @@ public final class InternalTestCluster extends TestCluster {
// trash all nodes with id >= sharedNodesSeeds.length - they are non shared
final List<NodeAndClient> toClose = new ArrayList<>();
for (Iterator<NodeAndClient> iterator = nodes.values().iterator(); iterator.hasNext();) {
NodeAndClient nodeAndClient = iterator.next();
for (NodeAndClient nodeAndClient : nodes.values()) {
if (nodeAndClient.nodeAndClientId() >= sharedNodesSeeds.length) {
logger.debug("Close Node [{}] not shared", nodeAndClient.name);
toClose.add(nodeAndClient);
@ -1213,7 +1230,7 @@ public final class InternalTestCluster extends TestCluster {
}
/** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */
public synchronized void validateClusterFormed(String viaNode) {
private synchronized void validateClusterFormed(String viaNode) {
Set<DiscoveryNode> expectedNodes = new HashSet<>();
for (NodeAndClient nodeAndClient : nodes.values()) {
expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode());
@ -1242,7 +1259,7 @@ public final class InternalTestCluster extends TestCluster {
}
@Override
public synchronized void afterTest() throws IOException {
public synchronized void afterTest() {
wipePendingDataDirectories();
randomlyResetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */
}
@ -1291,8 +1308,7 @@ public final class InternalTestCluster extends TestCluster {
private void assertNoPendingIndexOperations() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
@ -1300,7 +1316,7 @@ public final class InternalTestCluster extends TestCluster {
if (operations.size() > 0) {
throw new AssertionError(
"shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n --> " +
operations.stream().collect(Collectors.joining("\n --> "))
String.join("\n --> ", operations)
);
}
}
@ -1311,8 +1327,7 @@ public final class InternalTestCluster extends TestCluster {
private void assertOpenTranslogReferences() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
@ -1331,8 +1346,7 @@ public final class InternalTestCluster extends TestCluster {
private void assertNoSnapshottedIndexCommit() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
@ -1356,8 +1370,7 @@ public final class InternalTestCluster extends TestCluster {
* This assertion might be expensive, thus we prefer not to execute on every test but only interesting tests.
*/
public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
for (NodeAndClient nodeAndClient : nodes.values()) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
@ -1463,6 +1476,7 @@ public final class InternalTestCluster extends TestCluster {
}
private void randomlyResetClients() {
assert Thread.holdsLock(this);
// only reset the clients on nightly tests, it causes heavy load...
if (RandomizedTest.isNightly() && rarely(random)) {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
@ -1489,22 +1503,6 @@ public final class InternalTestCluster extends TestCluster {
}
}
private void markNodeDataDirsAsPendingForWipe(Node node) {
assert Thread.holdsLock(this);
NodeEnvironment nodeEnv = node.getNodeEnvironment();
if (nodeEnv.hasNodeFile()) {
dataDirToClean.addAll(Arrays.asList(nodeEnv.nodeDataPaths()));
}
}
private void markNodeDataDirsAsNotEligableForWipe(Node node) {
assert Thread.holdsLock(this);
NodeEnvironment nodeEnv = node.getNodeEnvironment();
if (nodeEnv.hasNodeFile()) {
dataDirToClean.removeAll(Arrays.asList(nodeEnv.nodeDataPaths()));
}
}
/**
* Returns a reference to a random node's {@link ClusterService}
*/
@ -1515,26 +1513,22 @@ public final class InternalTestCluster extends TestCluster {
/**
* Returns a reference to a node's {@link ClusterService}. If the given node is null, a random node will be selected.
*/
public synchronized ClusterService clusterService(@Nullable String node) {
public ClusterService clusterService(@Nullable String node) {
return getInstance(ClusterService.class, node);
}
/**
* Returns an Iterable to all instances for the given class &gt;T&lt; across all nodes in the cluster.
*/
public synchronized <T> Iterable<T> getInstances(Class<T> clazz) {
List<T> instances = new ArrayList<>(nodes.size());
for (NodeAndClient nodeAndClient : nodes.values()) {
instances.add(getInstanceFromNode(clazz, nodeAndClient.node));
}
return instances;
public <T> Iterable<T> getInstances(Class<T> clazz) {
return nodes.values().stream().map(node -> getInstanceFromNode(clazz, node.node)).collect(Collectors.toList());
}
/**
* Returns an Iterable to all instances for the given class &gt;T&lt; across all data nodes in the cluster.
*/
public synchronized <T> Iterable<T> getDataNodeInstances(Class<T> clazz) {
return getInstances(clazz, new DataNodePredicate());
public <T> Iterable<T> getDataNodeInstances(Class<T> clazz) {
return getInstances(clazz, DATA_NODE_PREDICATE);
}
public synchronized <T> T getCurrentMasterNodeInstance(Class<T> clazz) {
@ -1545,11 +1539,11 @@ public final class InternalTestCluster extends TestCluster {
* Returns an Iterable to all instances for the given class &gt;T&lt; across all data and master nodes
* in the cluster.
*/
public synchronized <T> Iterable<T> getDataOrMasterNodeInstances(Class<T> clazz) {
return getInstances(clazz, new DataNodePredicate().or(new MasterNodePredicate()));
public <T> Iterable<T> getDataOrMasterNodeInstances(Class<T> clazz) {
return getInstances(clazz, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE));
}
private synchronized <T> Iterable<T> getInstances(Class<T> clazz, Predicate<NodeAndClient> predicate) {
private <T> Iterable<T> getInstances(Class<T> clazz, Predicate<NodeAndClient> predicate) {
Iterable<NodeAndClient> filteredNodes = nodes.values().stream().filter(predicate)::iterator;
List<T> instances = new ArrayList<>();
for (NodeAndClient nodeAndClient : filteredNodes) {
@ -1561,16 +1555,16 @@ public final class InternalTestCluster extends TestCluster {
/**
* Returns a reference to the given nodes instances of the given class &gt;T&lt;
*/
public synchronized <T> T getInstance(Class<T> clazz, final String node) {
public <T> T getInstance(Class<T> clazz, final String node) {
return getInstance(clazz, nc -> node == null || node.equals(nc.name));
}
public synchronized <T> T getDataNodeInstance(Class<T> clazz) {
return getInstance(clazz, new DataNodePredicate());
public <T> T getDataNodeInstance(Class<T> clazz) {
return getInstance(clazz, DATA_NODE_PREDICATE);
}
public synchronized <T> T getMasterNodeInstance(Class<T> clazz) {
return getInstance(clazz, new MasterNodePredicate());
public <T> T getMasterNodeInstance(Class<T> clazz) {
return getInstance(clazz, MASTER_NODE_PREDICATE);
}
private synchronized <T> T getInstance(Class<T> clazz, Predicate<NodeAndClient> predicate) {
@ -1582,17 +1576,17 @@ public final class InternalTestCluster extends TestCluster {
/**
* Returns a reference to a random nodes instances of the given class &gt;T&lt;
*/
public synchronized <T> T getInstance(Class<T> clazz) {
public <T> T getInstance(Class<T> clazz) {
return getInstance(clazz, nc -> true);
}
private synchronized <T> T getInstanceFromNode(Class<T> clazz, Node node) {
private static <T> T getInstanceFromNode(Class<T> clazz, Node node) {
return node.injector().getInstance(clazz);
}
@Override
public synchronized int size() {
return this.nodes.size();
public int size() {
return nodes.size();
}
@Override
@ -1609,7 +1603,7 @@ public final class InternalTestCluster extends TestCluster {
*/
public synchronized boolean stopRandomDataNode() throws IOException {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(new DataNodePredicate());
NodeAndClient nodeAndClient = getRandomNodeAndClient(DATA_NODE_PREDICATE);
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
stopNodesAndClient(nodeAndClient);
@ -1638,9 +1632,10 @@ public final class InternalTestCluster extends TestCluster {
ensureOpen();
assert size() > 0;
String masterNodeName = getMasterName();
assert nodes.containsKey(masterNodeName);
final NodeAndClient masterNode = nodes.get(masterNodeName);
assert masterNode != null;
logger.info("Closing master node [{}] ", masterNodeName);
stopNodesAndClient(nodes.get(masterNodeName));
stopNodesAndClient(masterNode);
}
/**
@ -1694,14 +1689,15 @@ public final class InternalTestCluster extends TestCluster {
// cannot be a synchronized method since it's called on other threads from within synchronized startAndPublishNodesAndClients()
synchronized (discoveryFileMutex) {
try {
Stream<NodeAndClient> unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream());
final Collection<NodeAndClient> currentNodes = nodes.values();
Stream<NodeAndClient> unicastHosts = Stream.concat(currentNodes.stream(), newNodes.stream());
List<String> discoveryFileContents = unicastHosts.map(
nac -> nac.node.injector().getInstance(TransportService.class)
).filter(Objects::nonNull)
nac -> nac.node.injector().getInstance(TransportService.class)
).filter(Objects::nonNull)
.map(TransportService::getLocalNode).filter(Objects::nonNull).filter(DiscoveryNode::isMasterNode)
.map(n -> n.getAddress().toString())
.distinct().collect(Collectors.toList());
Set<Path> configPaths = Stream.concat(nodes.values().stream(), newNodes.stream())
Set<Path> configPaths = Stream.concat(currentNodes.stream(), newNodes.stream())
.map(nac -> nac.node.getEnvironment().configFile()).collect(Collectors.toSet());
logger.debug("configuring discovery with {} at {}", discoveryFileContents, configPaths);
for (final Path configPath : configPaths) {
@ -1714,7 +1710,7 @@ public final class InternalTestCluster extends TestCluster {
}
}
private synchronized void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException {
private void stopNodesAndClient(NodeAndClient nodeAndClient) throws IOException {
stopNodesAndClients(Collections.singleton(nodeAndClient));
}
@ -1723,7 +1719,7 @@ public final class InternalTestCluster extends TestCluster {
for (NodeAndClient nodeAndClient: nodeAndClients) {
removeDisruptionSchemeFromNode(nodeAndClient);
NodeAndClient previous = nodes.remove(nodeAndClient.name);
final NodeAndClient previous = removeNode(nodeAndClient);
assert previous == nodeAndClient;
nodeAndClient.close();
}
@ -1741,16 +1737,9 @@ public final class InternalTestCluster extends TestCluster {
/**
* Restarts a random data node in the cluster and calls the callback during restart.
*/
public void restartRandomDataNode(RestartCallback callback) throws Exception {
restartRandomNode(new DataNodePredicate(), callback);
}
/**
* Restarts a random node in the cluster and calls the callback during restart.
*/
private synchronized void restartRandomNode(Predicate<NodeAndClient> predicate, RestartCallback callback) throws Exception {
public synchronized void restartRandomDataNode(RestartCallback callback) throws Exception {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(predicate);
NodeAndClient nodeAndClient = getRandomNodeAndClient(InternalTestCluster.DATA_NODE_PREDICATE);
if (nodeAndClient != null) {
restartNode(nodeAndClient, callback);
}
@ -1788,6 +1777,7 @@ public final class InternalTestCluster extends TestCluster {
}
private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
assert Thread.holdsLock(this);
logger.info("Restarting node [{}] ", nodeAndClient.name);
if (activeDisruptionScheme != null) {
@ -1807,8 +1797,9 @@ public final class InternalTestCluster extends TestCluster {
nodeAndClient.startNode();
success = true;
} finally {
if (success == false)
nodes.remove(nodeAndClient.name);
if (success == false) {
removeNode(nodeAndClient);
}
}
if (activeDisruptionScheme != null) {
@ -1828,7 +1819,16 @@ public final class InternalTestCluster extends TestCluster {
}
}
private NodeAndClient removeNode(NodeAndClient nodeAndClient) {
assert Thread.holdsLock(this);
final NavigableMap<String, NodeAndClient> newNodes = new TreeMap<>(nodes);
final NodeAndClient previous = newNodes.remove(nodeAndClient.name);
nodes = Collections.unmodifiableNavigableMap(newNodes);
return previous;
}
private Set<String> excludeMasters(Collection<NodeAndClient> nodeAndClients) {
assert Thread.holdsLock(this);
final Set<String> excludedNodeIds = new HashSet<>();
if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {
@ -1847,7 +1847,7 @@ public final class InternalTestCluster extends TestCluster {
logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeIds);
try {
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get();
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(Strings.EMPTY_ARRAY))).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
}
@ -1861,6 +1861,7 @@ public final class InternalTestCluster extends TestCluster {
}
private void removeExclusions(Set<String> excludedNodeIds) {
assert Thread.holdsLock(this);
if (excludedNodeIds.isEmpty() == false) {
logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds);
try {
@ -1879,7 +1880,7 @@ public final class InternalTestCluster extends TestCluster {
int numNodesRestarted = 0;
final Settings[] newNodeSettings = new Settings[nextNodeId.get()];
Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
final int minMasterNodes = autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1;
for (NodeAndClient nodeAndClient : nodes.values()) {
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
@ -1894,7 +1895,7 @@ public final class InternalTestCluster extends TestCluster {
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
}
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == nodes.size();
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size();
// randomize start up order, but making sure that:
// 1) A data folder that was assigned to a data node will stay so
@ -1912,7 +1913,7 @@ public final class InternalTestCluster extends TestCluster {
final List<NodeAndClient> nodesByRole = nodesByRoles.get(roles);
startUpOrder.add(nodesByRole.remove(0));
}
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0;
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == 0;
for (NodeAndClient nodeAndClient : startUpOrder) {
logger.info("creating node [{}] ", nodeAndClient.name);
@ -1949,17 +1950,14 @@ public final class InternalTestCluster extends TestCluster {
}
}
synchronized Set<String> allDataNodesButN(int numNodes) {
return nRandomDataNodes(numDataNodes() - numNodes);
}
private synchronized Set<String> nRandomDataNodes(int numNodes) {
synchronized Set<String> allDataNodesButN(int count) {
final int numNodes = numDataNodes() - count;
assert size() >= numNodes;
Map<String, NodeAndClient> dataNodes =
nodes
.entrySet()
.stream()
.filter(new EntryNodePredicate(new DataNodePredicate()))
.filter(entry -> DATA_NODE_PREDICATE.test(entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final HashSet<String> set = new HashSet<>();
final Iterator<String> iterator = dataNodes.keySet().iterator();
@ -1996,7 +1994,8 @@ public final class InternalTestCluster extends TestCluster {
* If {@link #bootstrapMasterNodeIndex} is -1 (default), this method does nothing.
*/
private List<Settings> bootstrapMasterNodeWithSpecifiedIndex(List<Settings> allNodesSettings) {
if (getBootstrapMasterNodeIndex() == -1) { // fast-path
assert Thread.holdsLock(this);
if (bootstrapMasterNodeIndex == -1) { // fast-path
return allNodesSettings;
}
@ -2040,36 +2039,36 @@ public final class InternalTestCluster extends TestCluster {
/**
* Starts a node with default settings and returns its name.
*/
public synchronized String startNode() {
public String startNode() {
return startNode(Settings.EMPTY);
}
/**
* Starts a node with the given settings builder and returns its name.
*/
public synchronized String startNode(Settings.Builder settings) {
public String startNode(Settings.Builder settings) {
return startNode(settings.build());
}
/**
* Starts a node with the given settings and returns its name.
*/
public synchronized String startNode(Settings settings) {
public String startNode(Settings settings) {
return startNodes(settings).get(0);
}
/**
* Starts multiple nodes with default settings and returns their names
*/
public synchronized List<String> startNodes(int numOfNodes) {
public List<String> startNodes(int numOfNodes) {
return startNodes(numOfNodes, Settings.EMPTY);
}
/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(int numOfNodes, Settings settings) {
return startNodes(Collections.nCopies(numOfNodes, settings).stream().toArray(Settings[]::new));
public List<String> startNodes(int numOfNodes, Settings settings) {
return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}
/**
@ -2127,11 +2126,11 @@ public final class InternalTestCluster extends TestCluster {
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
}
public synchronized List<String> startMasterOnlyNodes(int numNodes) {
public List<String> startMasterOnlyNodes(int numNodes) {
return startMasterOnlyNodes(numNodes, Settings.EMPTY);
}
public synchronized List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
public List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
Settings settings1 = Settings.builder()
.put(settings)
.put(Node.NODE_MASTER_SETTING.getKey(), true)
@ -2140,17 +2139,11 @@ public final class InternalTestCluster extends TestCluster {
return startNodes(numNodes, settings1);
}
public synchronized List<String> startDataOnlyNodes(int numNodes) {
return startDataOnlyNodes(numNodes, Settings.EMPTY);
}
public synchronized List<String> startDataOnlyNodes(int numNodes, Settings settings) {
Settings settings1 = Settings.builder()
.put(settings)
.put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), true)
.build();
return startNodes(numNodes, settings1);
public List<String> startDataOnlyNodes(int numNodes) {
return startNodes(
numNodes,
Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), true).build());
}
/**
@ -2158,7 +2151,7 @@ public final class InternalTestCluster extends TestCluster {
*
* @param eligibleMasterNodeCount the number of master eligible nodes to use as basis for the min master node setting
*/
private int updateMinMasterNodes(int eligibleMasterNodeCount) {
private void updateMinMasterNodes(int eligibleMasterNodeCount) {
assert autoManageMinMasterNodes;
final int minMasterNodes = getMinMasterNodes(eligibleMasterNodeCount);
if (getMasterNodesCount() > 0) {
@ -2173,23 +2166,22 @@ public final class InternalTestCluster extends TestCluster {
minMasterNodes, getMasterNodesCount());
}
}
return minMasterNodes;
}
/** calculates a min master nodes value based on the given number of master nodes */
private int getMinMasterNodes(int eligibleMasterNodes) {
private static int getMinMasterNodes(int eligibleMasterNodes) {
return eligibleMasterNodes / 2 + 1;
}
private int getMasterNodesCount() {
return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count();
return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count();
}
public synchronized String startMasterOnlyNode() {
public String startMasterOnlyNode() {
return startMasterOnlyNode(Settings.EMPTY);
}
public synchronized String startMasterOnlyNode(Settings settings) {
public String startMasterOnlyNode(Settings settings) {
Settings settings1 = Settings.builder()
.put(settings)
.put(Node.NODE_MASTER_SETTING.getKey(), true)
@ -2198,10 +2190,11 @@ public final class InternalTestCluster extends TestCluster {
return startNode(settings1);
}
public synchronized String startDataOnlyNode() {
public String startDataOnlyNode() {
return startDataOnlyNode(Settings.EMPTY);
}
public synchronized String startDataOnlyNode(Settings settings) {
public String startDataOnlyNode(Settings settings) {
Settings settings1 = Settings.builder()
.put(settings)
.put(Node.NODE_MASTER_SETTING.getKey(), false)
@ -2212,7 +2205,9 @@ public final class InternalTestCluster extends TestCluster {
private synchronized void publishNode(NodeAndClient nodeAndClient) {
assert !nodeAndClient.node().isClosed();
nodes.put(nodeAndClient.name, nodeAndClient);
final NavigableMap<String, NodeAndClient> newNodes = new TreeMap<>(nodes);
newNodes.put(nodeAndClient.name, nodeAndClient);
nodes = Collections.unmodifiableNavigableMap(newNodes);
applyDisruptionSchemeToNode(nodeAndClient);
}
@ -2227,10 +2222,10 @@ public final class InternalTestCluster extends TestCluster {
@Override
public int numDataAndMasterNodes() {
return dataAndMasterNodes().size();
return filterNodes(nodes, DATA_NODE_PREDICATE.or(MASTER_NODE_PREDICATE)).size();
}
public synchronized int numMasterNodes() {
public int numMasterNodes() {
return filterNodes(nodes, NodeAndClient::isMasterEligible).size();
}
@ -2245,7 +2240,8 @@ public final class InternalTestCluster extends TestCluster {
clearDisruptionScheme(true);
}
public void clearDisruptionScheme(boolean ensureHealthyCluster) {
// synchronized to prevent concurrently modifying the cluster.
public synchronized void clearDisruptionScheme(boolean ensureHealthyCluster) {
if (activeDisruptionScheme != null) {
TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal();
logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime);
@ -2272,15 +2268,11 @@ public final class InternalTestCluster extends TestCluster {
}
}
private synchronized Collection<NodeAndClient> dataNodeAndClients() {
return filterNodes(nodes, new DataNodePredicate());
private Collection<NodeAndClient> dataNodeAndClients() {
return filterNodes(nodes, DATA_NODE_PREDICATE);
}
private synchronized Collection<NodeAndClient> dataAndMasterNodes() {
return filterNodes(nodes, new DataNodePredicate().or(new MasterNodePredicate()));
}
private synchronized Collection<NodeAndClient> filterNodes(Map<String, InternalTestCluster.NodeAndClient> map,
private static Collection<NodeAndClient> filterNodes(Map<String, InternalTestCluster.NodeAndClient> map,
Predicate<NodeAndClient> predicate) {
return map
.values()
@ -2289,51 +2281,16 @@ public final class InternalTestCluster extends TestCluster {
.collect(Collectors.toCollection(ArrayList::new));
}
private static final class DataNodePredicate implements Predicate<NodeAndClient> {
@Override
public boolean test(NodeAndClient nodeAndClient) {
return DiscoveryNode.isDataNode(nodeAndClient.node.settings());
}
}
private static final class MasterNodePredicate implements Predicate<NodeAndClient> {
@Override
public boolean test(NodeAndClient nodeAndClient) {
return DiscoveryNode.isMasterNode(nodeAndClient.node.settings());
}
}
private static final class NodeNamePredicate implements Predicate<NodeAndClient> {
private final HashSet<String> nodeNames;
private final String nodeName;
NodeNamePredicate(String... nodeNames) {
this.nodeNames = Sets.newHashSet(nodeNames);
NodeNamePredicate(String nodeName) {
this.nodeName = nodeName;
}
@Override
public boolean test(NodeAndClient nodeAndClient) {
return nodeNames.contains(nodeAndClient.getName());
}
}
private static final class NoDataNoMasterNodePredicate implements Predicate<NodeAndClient> {
@Override
public boolean test(NodeAndClient nodeAndClient) {
return DiscoveryNode.isMasterNode(nodeAndClient.node.settings()) == false &&
DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false;
}
}
private static final class EntryNodePredicate implements Predicate<Map.Entry<String, NodeAndClient>> {
private final Predicate<NodeAndClient> delegateNodePredicate;
EntryNodePredicate(Predicate<NodeAndClient> delegateNodePredicate) {
this.delegateNodePredicate = delegateNodePredicate;
}
@Override
public boolean test(Map.Entry<String, NodeAndClient> entry) {
return delegateNodePredicate.test(entry.getValue());
return nodeName.equals(nodeAndClient.getName());
}
}
@ -2365,8 +2322,7 @@ public final class InternalTestCluster extends TestCluster {
}
@Override
public synchronized Iterable<Client> getClients() {
ensureOpen();
public Iterable<Client> getClients() {
return () -> {
ensureOpen();
final Iterator<NodeAndClient> iterator = nodes.values().iterator();
@ -2498,7 +2454,7 @@ public final class InternalTestCluster extends TestCluster {
}
@Override
public void assertAfterTest() throws IOException {
public synchronized void assertAfterTest() throws IOException {
super.assertAfterTest();
assertRequestsFinished();
for (NodeAndClient nodeAndClient : nodes.values()) {
@ -2515,6 +2471,7 @@ public final class InternalTestCluster extends TestCluster {
}
private void assertRequestsFinished() {
assert Thread.holdsLock(this);
if (size() > 0) {
for (NodeAndClient nodeAndClient : nodes.values()) {
CircuitBreaker inFlightRequestsBreaker = getInstance(CircuitBreakerService.class, nodeAndClient.name)