Tests: Make sure snapshots created with old version of elasticsearch can be restored

Closes #8968
This commit is contained in:
Igor Motov 2014-12-15 15:26:34 -05:00
parent 3c0d2081cf
commit fcb0186d2e
29 changed files with 212 additions and 4 deletions

View File

@ -113,7 +113,7 @@ def start_node(version, release_dir, data_dir, tcp_port, http_port):
'-Des.transport.tcp.port=%s' % tcp_port,
'-Des.http.port=%s' % http_port
]
if version.startswith('0.') or version == '1.0.0.Beta1':
if version.startswith('0.') or version.startswith('1.0.0.Beta') :
cmd.append('-f') # version before 1.0 start in background automatically
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -150,17 +150,60 @@ def generate_index(client):
logging.info('Running basic asserts on the data added')
run_basic_asserts(client, 'test', 'doc', num_docs)
def snapshot_index(client, cfg):
# Add bogus persistent settings to make sure they can be restored
client.cluster.put_settings(body = {
'persistent': {
'cluster.routing.allocation.exclude.version_attr' : cfg.version
}
})
client.indices.put_template(name = 'template_' + cfg.version.lower(), order = 0, body = {
"template" : "te*",
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"type1" : {
"_source" : { "enabled" : False }
}
},
"aliases" : {
"alias1" : {},
"alias2" : {
"filter" : {
"term" : {"version" : cfg.version }
},
"routing" : "kimchy"
},
"{index}-alias" : {}
}
});
client.snapshot.create_repository(repository='test_repo', body={
'type': 'fs',
'settings': {
'location': cfg.repo_dir
}
})
client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True)
def compress_index(version, tmp_dir, output_dir):
compress(tmp_dir, output_dir, 'index-%s.zip' % version, 'data')
def compress_repo(version, tmp_dir, output_dir):
compress(tmp_dir, output_dir, 'repo-%s.zip' % version, 'repo')
def compress(tmp_dir, output_dir, zipfile, directory):
abs_output_dir = os.path.abspath(output_dir)
zipfile = os.path.join(abs_output_dir, 'index-%s.zip' % version)
zipfile = os.path.join(abs_output_dir, zipfile)
if os.path.exists(zipfile):
os.remove(zipfile)
logging.info('Compressing index into %s', zipfile)
olddir = os.getcwd()
os.chdir(tmp_dir)
subprocess.check_call('zip -r %s *' % zipfile, shell=True)
subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True)
os.chdir(olddir)
def parse_config():
parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests')
parser.add_argument('version', metavar='X.Y.Z',
@ -184,7 +227,10 @@ def parse_config():
cfg.tmp_dir = tempfile.mkdtemp()
cfg.data_dir = os.path.join(cfg.tmp_dir, 'data')
cfg.repo_dir = os.path.join(cfg.tmp_dir, 'repo')
logging.info('Temp data dir: %s' % cfg.data_dir)
logging.info('Temp repo dir: %s' % cfg.repo_dir)
cfg.snapshot_supported = not (cfg.version.startswith('0.') or cfg.version == '1.0.0.Beta1')
return cfg
@ -193,17 +239,21 @@ def main():
datefmt='%Y-%m-%d %I:%M:%S %p')
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.WARN)
cfg = parse_config()
try:
node = start_node(cfg.version, cfg.release_dir, cfg.data_dir, cfg.tcp_port, cfg.http_port)
client = create_client(cfg.http_port)
generate_index(client)
if cfg.snapshot_supported:
snapshot_index(client, cfg)
finally:
if 'node' in vars():
logging.info('Shutting down node with pid %d', node.pid)
node.terminate()
time.sleep(1) # some nodes take time to terminate
compress_index(cfg.version, cfg.tmp_dir, cfg.output_dir)
if cfg.snapshot_supported:
compress_repo(cfg.version, cfg.tmp_dir, cfg.output_dir)
if __name__ == '__main__':
try:

View File

@ -0,0 +1,158 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.bwcompat;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.AbstractSnapshotTests;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.junit.Test;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.URI;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Locale;
import java.util.SortedSet;
import java.util.TreeSet;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
@Slow
@ClusterScope(scope = Scope.TEST)
public class RestoreBackwardsCompatTests extends AbstractSnapshotTests {
@Test
public void restoreOldSnapshots() throws Exception {
String repo = "test_repo";
String snapshot = "test_1";
List<String> repoVersions = repoVersions();
assertThat(repoVersions.size(), greaterThan(0));
for (String version : repoVersions) {
createRepo(version, repo);
testOldSnapshot(version, repo, snapshot);
}
SortedSet<String> expectedVersions = new TreeSet<>();
for (java.lang.reflect.Field field : Version.class.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers()) && field.getType() == Version.class) {
Version v = (Version) field.get(Version.class);
if (v.snapshot()) continue;
if (v.onOrBefore(Version.V_1_0_0_Beta1)) continue;
expectedVersions.add(v.toString());
}
}
for (String repoVersion : repoVersions) {
if (expectedVersions.remove(repoVersion) == false) {
logger.warn("Old repositories tests contain extra repo: " + repoVersion);
}
}
if (expectedVersions.isEmpty() == false) {
StringBuilder msg = new StringBuilder("Old repositories tests are missing versions:");
for (String expected : expectedVersions) {
msg.append("\n" + expected);
}
fail(msg.toString());
}
}
public static List<String> repoVersions() throws Exception {
List<String> repoVersions = newArrayList();
Path repoFiles = Paths.get(RestoreBackwardsCompatTests.class.getResource(".").toURI());
try (DirectoryStream<Path> stream = Files.newDirectoryStream(repoFiles, "repo-*.zip")) {
for (Path entry : stream) {
String fileName = entry.getFileName().toString();
String version = fileName.substring("repo-".length());
version = version.substring(0, version.length() - ".zip".length());
repoVersions.add(version);
}
}
return repoVersions;
}
private void createRepo(String version, String repo) throws Exception {
String repoFile = "repo-" + version + ".zip";
URI repoFileUri = getClass().getResource(repoFile).toURI();
URI repoJarUri = new URI("jar:" + repoFileUri.toString() + "!/repo/");
logger.info("--> creating repository [{}] for version [{}]", repo, version);
assertAcked(client().admin().cluster().preparePutRepository(repo)
.setType("url").setSettings(ImmutableSettings.settingsBuilder()
.put("url", repoJarUri.toString())));
}
private void testOldSnapshot(String version, String repo, String snapshot) throws IOException {
logger.info("--> restoring snapshot");
RestoreSnapshotResponse response = client().admin().cluster().prepareRestoreSnapshot(repo, snapshot).setRestoreGlobalState(true).setWaitForCompletion(true).get();
assertThat(response.status(), equalTo(RestStatus.OK));
RestoreInfo restoreInfo = response.getRestoreInfo();
assertThat(restoreInfo.successfulShards(), greaterThan(0));
assertThat(restoreInfo.successfulShards(), equalTo(restoreInfo.totalShards()));
assertThat(restoreInfo.failedShards(), equalTo(0));
String index = restoreInfo.indices().get(0);
logger.info("--> check search");
SearchResponse searchResponse = client().prepareSearch(index).get();
assertThat(searchResponse.getHits().totalHits(), greaterThan(1L));
logger.info("--> check settings");
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertThat(clusterState.metaData().persistentSettings().get(FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP + "version_attr"), equalTo(version));
logger.info("--> check templates");
IndexTemplateMetaData template = clusterState.getMetaData().templates().get("template_" + version.toLowerCase(Locale.ROOT));
assertThat(template, notNullValue());
assertThat(template.template(), equalTo("te*"));
assertThat(template.settings().getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1), equalTo(1));
assertThat(template.mappings().size(), equalTo(1));
assertThat(template.mappings().get("type1").string(), equalTo("{\"type1\":{\"_source\":{\"enabled\":false}}}"));
if (Version.fromString(version).onOrAfter(Version.V_1_1_0)) {
// Support for aliases in templates was added in v1.1.0
assertThat(template.aliases().size(), equalTo(3));
assertThat(template.aliases().get("alias1"), notNullValue());
assertThat(template.aliases().get("alias2").filter().string(), containsString(version));
assertThat(template.aliases().get("alias2").indexRouting(), equalTo("kimchy"));
assertThat(template.aliases().get("{index}-alias"), notNullValue());
}
logger.info("--> cleanup");
cluster().wipeIndices(restoreInfo.indices().toArray(new String[restoreInfo.indices().size()]));
cluster().wipeTemplates();
}
}