Smaller aesthetic fixes to InternalTestCluster (#31831)

Allows cluster to auto-reconfigure faster by starting up nodes in parallel.
This commit is contained in:
Yannick Welsch 2018-07-06 11:42:09 +02:00 committed by GitHub
parent 450a450b2c
commit cce7dc20ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 27 deletions

View File

@ -263,8 +263,6 @@ public final class InternalTestCluster extends TestCluster {
this.nodePrefix = nodePrefix; this.nodePrefix = nodePrefix;
assert nodePrefix != null; assert nodePrefix != null;
ArrayList<Class<? extends Plugin>> tmpMockPlugins = new ArrayList<>(mockPlugins);
this.mockPlugins = mockPlugins; this.mockPlugins = mockPlugins;
@ -458,14 +456,9 @@ public final class InternalTestCluster extends TestCluster {
private synchronized NodeAndClient getRandomNodeAndClient(Predicate<NodeAndClient> predicate) { private synchronized NodeAndClient getRandomNodeAndClient(Predicate<NodeAndClient> predicate) {
ensureOpen(); ensureOpen();
Collection<NodeAndClient> values = nodes.values().stream().filter(predicate).collect(Collectors.toCollection(ArrayList::new)); List<NodeAndClient> values = nodes.values().stream().filter(predicate).collect(Collectors.toList());
if (!values.isEmpty()) { if (values.isEmpty() == false) {
int whichOne = random.nextInt(values.size()); return randomFrom(random, values);
for (NodeAndClient nodeAndClient : values) {
if (whichOne-- == 0) {
return nodeAndClient;
}
}
} }
return null; return null;
} }
@ -476,18 +469,14 @@ public final class InternalTestCluster extends TestCluster {
* stop any of the running nodes. * stop any of the running nodes.
*/ */
public synchronized void ensureAtLeastNumDataNodes(int n) { public synchronized void ensureAtLeastNumDataNodes(int n) {
boolean added = false;
int size = numDataNodes(); int size = numDataNodes();
for (int i = size; i < n; i++) { if (size < n) {
logger.info("increasing cluster size from {} to {}", size, n); logger.info("increasing cluster size from {} to {}", size, n);
added = true;
if (numSharedDedicatedMasterNodes > 0) { if (numSharedDedicatedMasterNodes > 0) {
startDataOnlyNode(Settings.EMPTY); startDataOnlyNodes(n - size);
} else { } else {
startNode(Settings.EMPTY); startNodes(n - size);
} }
}
if (added) {
validateClusterFormed(); validateClusterFormed();
} }
} }
@ -1361,8 +1350,9 @@ public final class InternalTestCluster extends TestCluster {
.filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters .filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters
.count(); .count();
final int currentMasters = getMasterNodesCount(); final int currentMasters = getMasterNodesCount();
if (autoManageMinMasterNodes && currentMasters > 1 && newMasters > 0) { if (autoManageMinMasterNodes && currentMasters > 0 && newMasters > 0 &&
// special case for 1 node master - we can't update the min master nodes before we add more nodes. getMinMasterNodes(currentMasters + newMasters) <= currentMasters) {
// if we're adding too many master-eligible nodes at once, we can't update the min master setting before adding the nodes.
updateMinMasterNodes(currentMasters + newMasters); updateMinMasterNodes(currentMasters + newMasters);
} }
List<Future<?>> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList()); List<Future<?>> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList());
@ -1377,7 +1367,8 @@ public final class InternalTestCluster extends TestCluster {
} }
nodeAndClients.forEach(this::publishNode); nodeAndClients.forEach(this::publishNode);
if (autoManageMinMasterNodes && currentMasters == 1 && newMasters > 0) { if (autoManageMinMasterNodes && currentMasters > 0 && newMasters > 0 &&
getMinMasterNodes(currentMasters + newMasters) > currentMasters) {
// update once masters have joined // update once masters have joined
validateClusterFormed(); validateClusterFormed();
updateMinMasterNodes(currentMasters + newMasters); updateMinMasterNodes(currentMasters + newMasters);
@ -1632,27 +1623,24 @@ public final class InternalTestCluster extends TestCluster {
} }
/** /**
* Starts a node with default settings and returns it's name. * Starts a node with default settings and returns its name.
*/ */
public synchronized String startNode() { public synchronized String startNode() {
return startNode(Settings.EMPTY); return startNode(Settings.EMPTY);
} }
/** /**
* Starts a node with the given settings builder and returns it's name. * Starts a node with the given settings builder and returns its name.
*/ */
public synchronized String startNode(Settings.Builder settings) { public synchronized String startNode(Settings.Builder settings) {
return startNode(settings.build()); return startNode(settings.build());
} }
/** /**
* Starts a node with the given settings and returns it's name. * Starts a node with the given settings and returns its name.
*/ */
public synchronized String startNode(Settings settings) { public synchronized String startNode(Settings settings) {
final int defaultMinMasterNodes = getMinMasterNodes(getMasterNodesCount() + (Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0)); return startNodes(settings).get(0);
NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes);
startAndPublishNodesAndClients(Collections.singletonList(buildNode));
return buildNode.name;
} }
/** /**