Merge pull request #20532 from rjernst/rolling_upgrades

This PR introduces backward compatibility index tests to test the rolling upgrade process amongst Elasticsearch instances within the same major version. The test executes in three phases. In the first phase, we form a cluster of 2 ES instances on an old version. In the second phase, we keep one of the nodes from the old cluster, kill the other node, but preserve its data directory and start an instance of the current version of ES using the same data directory as the killed instance. In the third phase, we kill the other old version ES instance from the first phase and launch a new instance, using the same data directory as the killed instance. Therefore, during phase 3, we have fully migrated and have all current versions of ES running. In each phase, we run REST tests that index documents and search them, ensuring at each stage that the documents from the previous phase are still there.

Note that because we haven't released a GA yet of 5.0, the tests currently don't start an old version cluster in the first phase. Once GA is released, this will be changed to make the backward compatibility version 5.0, while the current version in the cluster will be 5.x.
This commit is contained in:
Ali Beyad 2016-09-19 16:14:38 -04:00 committed by GitHub
commit 50584c4103
12 changed files with 343 additions and 59 deletions

View File

@ -20,8 +20,6 @@ package org.elasticsearch.gradle.test
import org.gradle.api.GradleException
import org.gradle.api.Project
import org.gradle.api.artifacts.Configuration
import org.gradle.api.file.FileCollection
import org.gradle.api.tasks.Input
/** Configuration for an elasticsearch cluster, used for integration tests. */
@ -47,6 +45,17 @@ class ClusterConfiguration {
@Input
int transportPort = 0
/**
* An override of the data directory. This may only be used with a single node.
* The value is lazily evaluated at runtime as a String path.
*/
@Input
Object dataDir = null
/** Optional override of the cluster name. */
@Input
String clusterName = null
@Input
boolean daemonize = true
@ -59,13 +68,24 @@ class ClusterConfiguration {
" " + System.getProperty('tests.jvm.argline', '')
/**
* The seed nodes port file. In the case the cluster has more than one node we use a seed node
* to form the cluster. The file is null if there is no seed node yet available.
* A closure to call which returns the unicast host to connect to for cluster formation.
*
* Note: this can only be null if the cluster has only one node or if the first node is not yet
* configured. All nodes but the first node should see a non null value.
* This allows multi node clusters, or a new cluster to connect to an existing cluster.
* The closure takes two arguments, the NodeInfo for the first node in the cluster, and
* an AntBuilder which may be used to wait on conditions before returning.
*/
File seedNodePortsFile
@Input
Closure unicastTransportUri = { NodeInfo seedNode, NodeInfo node, AntBuilder ant ->
if (seedNode == node) {
return null
}
ant.waitfor(maxwait: '20', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond') {
resourceexists {
file(file: seedNode.transportPortsFile.toString())
}
}
return seedNode.transportUri()
}
/**
* A closure to call before the cluster is considered ready. The closure is passed the node info,
@ -75,7 +95,11 @@ class ClusterConfiguration {
@Input
Closure waitCondition = { NodeInfo node, AntBuilder ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=${numNodes}",
ant.echo("==> [${new Date()}] checking health: http://${node.httpUri()}/_cluster/health?wait_for_nodes>=${numNodes}")
// checking here for wait_for_nodes to be >= the number of nodes because its possible
// this cluster is attempting to connect to nodes created by another task (same cluster name),
// so there will be more nodes in that case in the cluster state
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes>=${numNodes}",
dest: tmpFile.toString(),
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
retries: 10)
@ -137,12 +161,4 @@ class ClusterConfiguration {
}
extraConfigFiles.put(path, sourceFile)
}
/** Returns an address and port suitable for a uri to connect to this clusters seed node over transport protocol*/
String seedNodeTransportUri() {
if (seedNodePortsFile != null) {
return seedNodePortsFile.readLines("UTF-8").get(0)
}
return null;
}
}

View File

@ -46,9 +46,9 @@ class ClusterFormationTasks {
/**
* Adds dependent tasks to the given task to start and stop a cluster with the given configuration.
*
* Returns a NodeInfo object for the first node in the cluster.
* Returns a list of NodeInfo objects for each node in the cluster.
*/
static NodeInfo setup(Project project, Task task, ClusterConfiguration config) {
static List<NodeInfo> setup(Project project, Task task, ClusterConfiguration config) {
if (task.getEnabled() == false) {
// no need to add cluster formation tasks if the task won't run!
return
@ -95,22 +95,14 @@ class ClusterFormationTasks {
distro = project.configurations.elasticsearchBwcDistro
}
NodeInfo node = new NodeInfo(config, i, project, task, elasticsearchVersion, sharedDir)
if (i == 0) {
if (config.seedNodePortsFile != null) {
// we might allow this in the future to be set but for now we are the only authority to set this!
throw new GradleException("seedNodePortsFile has a non-null value but first node has not been intialized")
}
config.seedNodePortsFile = node.transportPortsFile;
}
nodes.add(node)
startTasks.add(configureNode(project, task, cleanup, node, distro))
startTasks.add(configureNode(project, task, cleanup, node, distro, nodes.get(0)))
}
Task wait = configureWaitTask("${task.name}#wait", project, nodes, startTasks)
task.dependsOn(wait)
// delay the resolution of the uri by wrapping in a closure, so it is not used until read for tests
return nodes[0]
return nodes
}
/** Adds a dependency on the given distribution */
@ -141,7 +133,7 @@ class ClusterFormationTasks {
*
* @return a task which starts the node.
*/
static Task configureNode(Project project, Task task, Object dependsOn, NodeInfo node, Configuration configuration) {
static Task configureNode(Project project, Task task, Object dependsOn, NodeInfo node, Configuration configuration, NodeInfo seedNode) {
// tasks are chained so their execution order is maintained
Task setup = project.tasks.create(name: taskName(task, node, 'clean'), type: Delete, dependsOn: dependsOn) {
@ -154,7 +146,7 @@ class ClusterFormationTasks {
setup = configureCheckPreviousTask(taskName(task, node, 'checkPrevious'), project, setup, node)
setup = configureStopTask(taskName(task, node, 'stopPrevious'), project, setup, node)
setup = configureExtractTask(taskName(task, node, 'extract'), project, setup, node, configuration)
setup = configureWriteConfigTask(taskName(task, node, 'configure'), project, setup, node)
setup = configureWriteConfigTask(taskName(task, node, 'configure'), project, setup, node, seedNode)
setup = configureExtraConfigFilesTask(taskName(task, node, 'extraConfig'), project, setup, node)
setup = configureCopyPluginsTask(taskName(task, node, 'copyPlugins'), project, setup, node)
@ -181,9 +173,10 @@ class ClusterFormationTasks {
Task start = configureStartTask(taskName(task, node, 'start'), project, setup, node)
if (node.config.daemonize) {
// if we are running in the background, make sure to stop the server when the task completes
Task stop = configureStopTask(taskName(task, node, 'stop'), project, [], node)
// if we are running in the background, make sure to stop the server when the task completes
task.finalizedBy(stop)
start.finalizedBy(stop)
}
return start
}
@ -249,7 +242,7 @@ class ClusterFormationTasks {
}
/** Adds a task to write elasticsearch.yml for the given node configuration */
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node) {
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) {
Map esConfig = [
'cluster.name' : node.clusterName,
'pidfile' : node.pidFile,
@ -266,15 +259,9 @@ class ClusterFormationTasks {
Task writeConfig = project.tasks.create(name: name, type: DefaultTask, dependsOn: setup)
writeConfig.doFirst {
if (node.nodeNum > 0) { // multi-node cluster case, we have to wait for the seed node to startup
ant.waitfor(maxwait: '20', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond') {
resourceexists {
file(file: node.config.seedNodePortsFile.toString())
}
}
// the seed node is enough to form the cluster - all subsequent nodes will get the seed node as a unicast
// host and join the cluster via that.
esConfig['discovery.zen.ping.unicast.hosts'] = "\"${node.config.seedNodeTransportUri()}\""
String unicastTransportUri = node.config.unicastTransportUri(seedNode, node, project.ant)
if (unicastTransportUri != null) {
esConfig['discovery.zen.ping.unicast.hosts'] = "\"${unicastTransportUri}\""
}
File configFile = new File(node.confDir, 'elasticsearch.yml')
logger.info("Configuring ${configFile}")

View File

@ -57,6 +57,9 @@ class NodeInfo {
/** config directory */
File confDir
/** data directory (as an Object, to allow lazy evaluation) */
Object dataDir
/** THE config file */
File configFile
@ -95,11 +98,23 @@ class NodeInfo {
this.config = config
this.nodeNum = nodeNum
this.sharedDir = sharedDir
clusterName = "${task.path.replace(':', '_').substring(1)}"
if (config.clusterName != null) {
clusterName = config.clusterName
} else {
clusterName = "${task.path.replace(':', '_').substring(1)}"
}
baseDir = new File(project.buildDir, "cluster/${task.name} node${nodeNum}")
pidFile = new File(baseDir, 'es.pid')
homeDir = homeDir(baseDir, config.distribution, nodeVersion)
confDir = confDir(baseDir, config.distribution, nodeVersion)
if (config.dataDir != null) {
if (config.numNodes != 1) {
throw new IllegalArgumentException("Cannot set data dir for integ test with more than one node")
}
dataDir = config.dataDir
} else {
dataDir = new File(homeDir, "data")
}
configFile = new File(confDir, 'elasticsearch.yml')
// even for rpm/deb, the logs are under home because we dont start with real services
File logsDir = new File(homeDir, 'logs')
@ -140,7 +155,7 @@ class NodeInfo {
}
}
env.put('ES_JVM_OPTIONS', new File(confDir, 'jvm.options'))
args.addAll("-E", "path.conf=${confDir}")
args.addAll("-E", "path.conf=${confDir}", "-E", "path.data=${-> dataDir.toString()}")
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
args.add('"') // end the entire command, quoted
}
@ -184,6 +199,19 @@ class NodeInfo {
return transportPortsFile.readLines("UTF-8").get(0)
}
/** Returns the file which contains the transport protocol ports for this node */
File getTransportPortsFile() {
return transportPortsFile
}
/** Returns the data directory for this node */
File getDataDir() {
if (!(dataDir instanceof File)) {
return new File(dataDir)
}
return dataDir
}
/** Returns the directory elasticsearch home is contained in for the given distribution */
static File homeDir(File baseDir, String distro, String nodeVersion) {
String path

View File

@ -34,6 +34,9 @@ public class RestIntegTestTask extends RandomizedTestingTask {
ClusterConfiguration clusterConfig
/** Info about nodes in the integ test cluster. Note this is *not* available until runtime. */
List<NodeInfo> nodes
/** Flag indicating whether the rest tests in the rest spec should be run. */
@Input
boolean includePackaged = false
@ -52,6 +55,12 @@ public class RestIntegTestTask extends RandomizedTestingTask {
parallelism = '1'
include('**/*IT.class')
systemProperty('tests.rest.load_packaged', 'false')
systemProperty('tests.rest.cluster', "${-> nodes[0].httpUri()}")
systemProperty('tests.config.dir', "${-> nodes[0].confDir}")
// TODO: our "client" qa tests currently use the rest-test plugin. instead they should have their own plugin
// that sets up the test cluster and passes this transport uri instead of http uri. Until then, we pass
// both as separate sysprops
systemProperty('tests.cluster', "${-> nodes[0].transportUri()}")
// copy the rest spec/tests into the test resources
RestSpecHack.configureDependencies(project)
@ -61,13 +70,7 @@ public class RestIntegTestTask extends RandomizedTestingTask {
// this must run after all projects have been configured, so we know any project
// references can be accessed as a fully configured
project.gradle.projectsEvaluated {
NodeInfo node = ClusterFormationTasks.setup(project, this, clusterConfig)
systemProperty('tests.rest.cluster', "${-> node.httpUri()}")
systemProperty('tests.config.dir', "${-> node.confDir}")
// TODO: our "client" qa tests currently use the rest-test plugin. instead they should have their own plugin
// that sets up the test cluster and passes this transport uri instead of http uri. Until then, we pass
// both as separate sysprops
systemProperty('tests.cluster', "${-> node.transportUri()}")
nodes = ClusterFormationTasks.setup(project, this, clusterConfig)
}
}
@ -88,6 +91,10 @@ public class RestIntegTestTask extends RandomizedTestingTask {
return clusterConfig
}
public List<NodeInfo> getNodes() {
return nodes
}
@Override
public Task dependsOn(Object... dependencies) {
super.dependsOn(dependencies)

View File

@ -43,18 +43,22 @@ public class RestSpecHack {
}
/**
* Creates a task to copy the rest spec files.
* Creates a task (if necessary) to copy the rest spec files.
*
* @param project The project to add the copy task to
* @param includePackagedTests true if the packaged tests should be copied, false otherwise
*/
public static Task configureTask(Project project, boolean includePackagedTests) {
Task copyRestSpec = project.tasks.findByName('copyRestSpec')
if (copyRestSpec != null) {
return copyRestSpec
}
Map copyRestSpecProps = [
name : 'copyRestSpec',
type : Copy,
dependsOn: [project.configurations.restSpec, 'processTestResources']
]
Task copyRestSpec = project.tasks.create(copyRestSpecProps) {
copyRestSpec = project.tasks.create(copyRestSpecProps) {
from { project.zipTree(project.configurations.restSpec.singleFile) }
include 'rest-api-spec/api/**'
if (includePackagedTests) {

View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.test.RestIntegTestTask
apply plugin: 'elasticsearch.standalone-test'
task oldClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
cluster {
distribution = 'zip'
// TODO: Right now, this just forms a cluster with the current version of ES,
// because we don't support clusters with nodes on different alpha/beta releases of ES.
// When the GA is released, we should change the bwcVersion to 5.0.0 and uncomment
// numBwcNodes = 2
//bwcVersion = '5.0.0-alpha5' // TODO: either randomize, or make this settable with sysprop
//numBwcNodes = 2
numNodes = 2
clusterName = 'rolling-upgrade'
}
systemProperty 'tests.rest.suite', 'old_cluster'
}
task mixedClusterTest(type: RestIntegTestTask) {
dependsOn(oldClusterTest, 'oldClusterTest#node1.stop')
cluster {
distribution = 'zip'
clusterName = 'rolling-upgrade'
unicastTransportUri = { seedNode, node, ant -> oldClusterTest.nodes.get(0).transportUri() }
dataDir = "${-> oldClusterTest.nodes[1].dataDir}"
}
systemProperty 'tests.rest.suite', 'mixed_cluster'
finalizedBy 'oldClusterTest#node0.stop'
}
task upgradedClusterTest(type: RestIntegTestTask) {
dependsOn(mixedClusterTest, 'oldClusterTest#node0.stop')
cluster {
distribution = 'zip'
clusterName = 'rolling-upgrade'
unicastTransportUri = { seedNode, node, ant -> mixedClusterTest.nodes.get(0).transportUri() }
dataDir = "${-> oldClusterTest.nodes[0].dataDir}"
}
systemProperty 'tests.rest.suite', 'upgraded_cluster'
// only need to kill the mixed cluster tests node here because we explicitly told it to not stop nodes upon completion
finalizedBy 'mixedClusterTest#stop'
}
task integTest {
dependsOn = [upgradedClusterTest]
}
test.enabled = false // no unit tests for rolling upgrades, only the rest integration test
check.dependsOn(integTest)

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.upgrades;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.elasticsearch.test.rest.yaml.parser.ClientYamlTestParseException;
import java.io.IOException;
@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs
public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
public UpgradeClusterClientYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws IOException, ClientYamlTestParseException {
return createParameters(0, 1);
}
}

View File

@ -0,0 +1,37 @@
---
"Index data and search on the mixed cluster":
- do:
cluster.health:
wait_for_status: green
wait_for_nodes: 2
- do:
search:
index: test_index
- match: { hits.total: 5 } # no new indexed data, so expect the original 5 documents from the old cluster
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v1_mixed", "f2": 5}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v2_mixed", "f2": 6}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v3_mixed", "f2": 7}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v4_mixed", "f2": 8}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v5_mixed", "f2": 9}'
- do:
indices.flush:
index: test_index
- do:
search:
index: test_index
- match: { hits.total: 10 } # 5 docs from old cluster, 5 docs from mixed cluster

View File

@ -0,0 +1,34 @@
---
"Index data and search on the old cluster":
- do:
indices.create:
index: test_index
body:
settings:
index:
number_of_replicas: 0
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v1_old", "f2": 0}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v2_old", "f2": 1}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v3_old", "f2": 2}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v4_old", "f2": 3}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v5_old", "f2": 4}'
- do:
indices.flush:
index: test_index
- do:
search:
index: test_index
- match: { hits.total: 5 }

View File

@ -0,0 +1,37 @@
---
"Index data and search on the upgraded cluster":
- do:
cluster.health:
wait_for_status: green
wait_for_nodes: 2
- do:
search:
index: test_index
- match: { hits.total: 10 } # no new indexed data, so expect the original 10 documents from the old and mixed clusters
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v1_upgraded", "f2": 10}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v2_upgraded", "f2": 11}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v3_upgraded", "f2": 12}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v4_upgraded", "f2": 13}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v5_upgraded", "f2": 14}'
- do:
indices.flush:
index: test_index
- do:
search:
index: test_index
- match: { hits.total: 15 } # 10 docs from previous clusters plus 5 new docs

View File

@ -55,6 +55,7 @@ List projects = [
'plugins:store-smb',
'qa:backwards-5.0',
'qa:evil-tests',
'qa:rolling-upgrade',
'qa:smoke-test-client',
'qa:smoke-test-ingest-with-all-dependencies',
'qa:smoke-test-ingest-disabled',

View File

@ -114,6 +114,7 @@ public class ESRestTestCase extends ESTestCase {
}
}
/**
* Clean up after the test case.
*/
@ -138,14 +139,27 @@ public class ESRestTestCase extends ESTestCase {
return adminClient;
}
/**
* Returns whether to preserve the indices created during this test on completion of this test.
* Defaults to {@code false}. Override this method if indices should be preserved after the test,
* with the assumption that some other process or test will clean up the indices afterward.
* This is useful if the data directory and indices need to be preserved between test runs
* (for example, when testing rolling upgrades).
*/
protected boolean preserveIndicesUponCompletion() {
return false;
}
private void wipeCluster() throws IOException {
// wipe indices
try {
adminClient().performRequest("DELETE", "*");
} catch (ResponseException e) {
// 404 here just means we had no indexes
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
if (preserveIndicesUponCompletion() == false) {
// wipe indices
try {
adminClient().performRequest("DELETE", "*");
} catch (ResponseException e) {
// 404 here just means we had no indexes
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}
}