Made node config always have a unicast transport uri closure

This commit is contained in:
Ryan Ernst 2016-09-06 15:51:14 -07:00
parent 5d8aa6b4fe
commit a844b085f1
5 changed files with 53 additions and 46 deletions

View File

@ -45,9 +45,12 @@ class ClusterConfiguration {
@Input
int transportPort = 0
/** An override of the data directory. This may only be used with a single node. */
/**
* An override of the data directory. This may only be used with a single node.
* The value is lazily evaluated at runtime as a String path.
*/
@Input
File dataDir = null
Object dataDir = null
/** Optional override of the cluster name. */
@Input
@ -65,16 +68,24 @@ class ClusterConfiguration {
" " + System.getProperty('tests.jvm.argline', '')
/**
* A uri that should be used for the unicast host list.
* A closure to call which returns the unicast host to connect to for cluster formation.
*
* This allows multi node clusters, or a new cluster to connect to an existing cluster.
* The type is Object to allow lazy evaluation. Typically this would be set with a
* closure in a GString like:
*
* {@code "${-> node.transportUri()}"}
* The closure takes two arguments, the NodeInfo for the first node in the cluster, and
* an AntBuilder which may be used to wait on conditions before returning.
*/
@Input
Closure unicastTransportUri = null
Closure unicastTransportUri = { NodeInfo seedNode, NodeInfo node, AntBuilder ant ->
if (seedNode == node) {
return null
}
ant.waitfor(maxwait: '20', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond') {
resourceexists {
file(file: seedNode.transportPortsFile.toString())
}
}
return seedNode.transportUri()
}
/**
* A closure to call before the cluster is considered ready. The closure is passed the node info,

View File

@ -86,7 +86,6 @@ class ClusterFormationTasks {
configureDistributionDependency(project, config.distribution, project.configurations.elasticsearchBwcDistro, config.bwcVersion)
}
NodeInfo seedNode = null
for (int i = 0; i < config.numNodes; ++i) {
// we start N nodes and out of these N nodes there might be M bwc nodes.
// for each of those nodes we might have a different configuratioon
@ -97,10 +96,7 @@ class ClusterFormationTasks {
}
NodeInfo node = new NodeInfo(config, i, project, task, elasticsearchVersion, sharedDir)
nodes.add(node)
if (i == 0) {
seedNode = node
}
startTasks.add(configureNode(project, task, cleanup, node, distro, seedNode))
startTasks.add(configureNode(project, task, cleanup, node, distro, nodes.get(0)))
}
Task wait = configureWaitTask("${task.name}#wait", project, nodes, startTasks)
@ -263,20 +259,9 @@ class ClusterFormationTasks {
Task writeConfig = project.tasks.create(name: name, type: DefaultTask, dependsOn: setup)
writeConfig.doFirst {
if (node.config.unicastTransportUri != null) {
// if the unicast transport uri was specified, use it for all nodes
// this will typically be the case if all the nodes we are setting up
// should connect to a master in an already formed cluster
esConfig['discovery.zen.ping.unicast.hosts'] = node.config.unicastTransportUri()
} else if (node.nodeNum > 0) { // multi-node cluster case, we have to wait for the seed node to startup
ant.waitfor(maxwait: '20', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond') {
resourceexists {
file(file: seedNode.transportPortsFile.toString())
}
}
// the seed node is enough to form the cluster - all subsequent nodes will get the seed node as a unicast
// host and join the cluster via that.
esConfig['discovery.zen.ping.unicast.hosts'] = "\"${seedNode.transportUri()}\""
String unicastTransportUri = node.config.unicastTransportUri(seedNode, node, project.ant)
if (unicastTransportUri != null) {
esConfig['discovery.zen.ping.unicast.hosts'] = unicastTransportUri
}
File configFile = new File(node.confDir, 'elasticsearch.yml')
logger.info("Configuring ${configFile}")

View File

@ -57,8 +57,8 @@ class NodeInfo {
/** config directory */
File confDir
/** data directory */
File dataDir
/** data directory (as an Object, to allow lazy evaluation) */
Object dataDir
/** THE config file */
File configFile
@ -155,7 +155,7 @@ class NodeInfo {
}
}
env.put('ES_JVM_OPTIONS', new File(confDir, 'jvm.options'))
args.addAll("-E", "path.conf=${confDir}", "-E", "path.data=${dataDir}")
args.addAll("-E", "path.conf=${confDir}", "-E", "path.data=${-> dataDir.toString()}")
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
args.add('"') // end the entire command, quoted
}
@ -199,6 +199,16 @@ class NodeInfo {
return transportPortsFile.readLines("UTF-8").get(0)
}
/** Returns the file which contains the transport protocol ports for this node */
File getTransportPortsFile() {
return transportPortsFile
}
/** Returns the data directory for this node */
File getDataDir() {
return dataDir
}
/** Returns the directory elasticsearch home is contained in for the given distribution */
static File homeDir(File baseDir, String distro, String nodeVersion) {
String path

View File

@ -43,18 +43,22 @@ public class RestSpecHack {
}
/**
* Creates a task to copy the rest spec files.
* Creates a task (if necessary) to copy the rest spec files.
*
* @param project The project to add the copy task to
* @param includePackagedTests true if the packaged tests should be copied, false otherwise
*/
public static Task configureTask(Project project, boolean includePackagedTests) {
Task copyRestSpec = project.tasks.findByName('copyRestSpec')
if (copyRestSpec != null) {
return copyRestSpec
}
Map copyRestSpecProps = [
name : 'copyRestSpec',
type : Copy,
dependsOn: [project.configurations.restSpec, 'processTestResources']
]
Task copyRestSpec = project.tasks.create(copyRestSpecProps) {
copyRestSpec = project.tasks.create(copyRestSpecProps) {
from { project.zipTree(project.configurations.restSpec.singleFile) }
include 'rest-api-spec/api/**'
if (includePackagedTests) {

View File

@ -17,50 +17,47 @@
* under the License.
*/
import org.elasticsearch.gradle.test.NodeInfo
import org.elasticsearch.gradle.test.RestIntegTestTask
RestIntegTestTask oldClusterTask = task oldClusterTest(type: RestIntegTestTask) {
apply plugin: 'elasticsearch.standalone-test'
task oldClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
cluster {
distribution = 'zip'
bwcVersion = '2.4.0' // TODO: either randomize, or make this settable with sysprop
numBwcNodes = 2
numNodes = 0
numNodes = 2
clusterName = 'rolling-upgrade'
}
systemProperty 'tests.rest.suite', 'old_cluster'
}
task mixedClusterTest(type: RestIntegTestTask) {
dependsOn(oldClusterTest, 'oldClusterTest.node1') // TODO: what does this `oldClusterTest.node1` do? is it a task?
NodeInfo aliveNode = oldClusterTask.getNodes().get(0)
NodeInfo stoppedNode = oldClusterTask.getNodes().get(1)
dependsOn(oldClusterTest, 'oldClusterTest#node1.stop')
cluster {
distribution = 'zip'
clusterName = 'rolling-upgrade'
unicastTransportUri = { -> aliveNode.transportUri() }
dataDir = stoppedNode.dataDir
unicastTransportUri = { seedNode, node, ant -> oldClusterTest.nodes.get(0).transportUri() }
dataDir = "${-> oldClusterTest.nodes[1].dataDir}"
}
systemProperty 'tests.rest.suite', 'mixed_cluster'
}
task upgradedClusterTest(type: RestIntegTestTask) {
// stop alive node from oldClusterTest and get its dataDir, and get alive node for unicast host
NodeInfo stoppedNode = null
NodeInfo aliveNode = null
dependsOn(mixedClusterTest, 'oldClusterTest#node0.stop')
cluster {
distribution = 'zip'
clusterName = 'rolling-upgrade'
unicastTransportUri = { -> aliveNode.transportUri() }
dataDir = stoppedNode.dataDir
unicastTransportUri = { seedNode, node, ant -> mixedClusterTest.nodes.get(0).transportUri() }
dataDir = "${-> oldClusterTest.nodes[0].dataDir}"
}
systemProperty 'tests.rest.suite', 'upgraded_cluster'
}
task integTest {
dependsOn = [upgradedClusterTest]
dependsOn = [oldClusterTest, mixedClusterTest, upgradedClusterTest]
}
check.dependsOn(integTest)