Add qa module that tests reindex-from-remote against pre-5.0 versions of Elasticsearch (#24561)

Adds tests for reindex-from-remote for the latest 2.4, 1.7, and
0.90 releases. 2.4 and 1.7 are fairly popular versions but 0.90
is a point of pride.

This fixes any issues those tests revealed.

Closes #23828
Closes #24520
This commit is contained in:
Nik Everett 2017-05-11 10:06:20 -04:00 committed by GitHub
parent 8f798f1231
commit 8188569fd1
9 changed files with 417 additions and 16 deletions

View File

@ -43,6 +43,7 @@ import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
final class RemoteRequestBuilders {
private RemoteRequestBuilders() {}
@ -59,7 +60,14 @@ final class RemoteRequestBuilders {
static Map<String, String> initialSearchParams(SearchRequest searchRequest, Version remoteVersion) {
Map<String, String> params = new HashMap<>();
if (searchRequest.scroll() != null) {
params.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
TimeValue keepAlive = searchRequest.scroll().keepAlive();
if (remoteVersion.before(Version.V_5_0_0)) {
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros
* so we toss out that resolution, rounding up because more scroll
* timeout seems safer than less. */
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
}
params.put("scroll", keepAlive.getStringRep());
}
params.put("size", Integer.toString(searchRequest.source().size()));
if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
@ -93,6 +101,10 @@ final class RemoteRequestBuilders {
if (remoteVersion.before(Version.fromId(2000099))) {
// Versions before 2.0.0 need prompting to return interesting fields. Note that timestamp isn't available at all....
searchRequest.source().storedField("_parent").storedField("_routing").storedField("_ttl");
if (remoteVersion.before(Version.fromId(1000099))) {
// Versions before 1.0.0 don't support `"_source": true` so we have to ask for the _source in a funny way.
searchRequest.source().storedField("_source");
}
}
if (searchRequest.source().storedFields() != null && false == searchRequest.source().storedFields().fieldNames().isEmpty()) {
StringBuilder fields = new StringBuilder(searchRequest.source().storedFields().fieldNames().get(0));
@ -105,7 +117,7 @@ final class RemoteRequestBuilders {
return params;
}
static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query) {
static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
// EMPTY is safe here because we're not calling namedObject
try (XContentBuilder entity = JsonXContent.contentBuilder();
XContentParser queryParser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, query)) {
@ -125,7 +137,10 @@ final class RemoteRequestBuilders {
if (searchRequest.source().fetchSource() != null) {
entity.field("_source", searchRequest.source().fetchSource());
} else {
entity.field("_source", true);
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
entity.field("_source", true);
}
}
entity.endObject();
@ -167,7 +182,13 @@ final class RemoteRequestBuilders {
return "/_search/scroll";
}
static Map<String, String> scrollParams(TimeValue keepAlive) {
static Map<String, String> scrollParams(TimeValue keepAlive, Version remoteVersion) {
if (remoteVersion.before(Version.V_5_0_0)) {
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros
* so we toss out that resolution, rounding up so we shouldn't end up
* with 0s. */
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
}
return singletonMap("scroll", keepAlive.getStringRep());
}

View File

@ -87,7 +87,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
lookupRemoteVersion(version -> {
remoteVersion = version;
execute("POST", initialSearchPath(searchRequest), initialSearchParams(searchRequest, version),
initialSearchEntity(searchRequest, query), RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
initialSearchEntity(searchRequest, query, remoteVersion), RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
});
}
@ -106,8 +106,10 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse) {
execute("POST", scrollPath(), scrollParams(timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos())),
scrollEntity(scrollId, remoteVersion), RESPONSE_PARSER, onResponse);
Map<String, String> scrollParams = scrollParams(
timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()),
remoteVersion);
execute("POST", scrollPath(), scrollParams, scrollEntity(scrollId, remoteVersion), RESPONSE_PARSER, onResponse);
}
@Override

View File

@ -35,6 +35,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams;
@ -43,6 +44,7 @@ import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrol
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
@ -153,39 +155,60 @@ public class RemoteRequestBuildersTests extends ESTestCase {
if (scroll == null) {
assertThat(params, not(hasKey("scroll")));
} else {
assertEquals(scroll, TimeValue.parseTimeValue(params.get("scroll"), "scroll"));
assertScroll(remoteVersion, params, scroll);
}
assertThat(params, hasEntry("size", Integer.toString(size)));
assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null)));
}
private void assertScroll(Version remoteVersion, Map<String, String> params, TimeValue requested) {
if (remoteVersion.before(Version.V_5_0_0)) {
// Versions of Elasticsearch prior to 5.0 can't parse nanos or micros in TimeValue.
assertThat(params.get("scroll"), not(either(endsWith("nanos")).or(endsWith("micros"))));
if (requested.getStringRep().endsWith("nanos") || requested.getStringRep().endsWith("micros")) {
long millis = (long) Math.ceil(requested.millisFrac());
assertEquals(TimeValue.parseTimeValue(params.get("scroll"), "scroll"), timeValueMillis(millis));
return;
}
}
assertEquals(requested, TimeValue.parseTimeValue(params.get("scroll"), "scroll"));
}
public void testInitialSearchEntity() throws IOException {
Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id));
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder());
String query = "{\"match_all\":{}}";
HttpEntity entity = initialSearchEntity(searchRequest, new BytesArray(query));
HttpEntity entity = initialSearchEntity(searchRequest, new BytesArray(query), remoteVersion);
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertEquals("{\"query\":" + query + ",\"_source\":true}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
assertEquals("{\"query\":" + query + ",\"_source\":true}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
} else {
assertEquals("{\"query\":" + query + "}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
}
// Source filtering is included if set up
searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"});
entity = initialSearchEntity(searchRequest, new BytesArray(query));
entity = initialSearchEntity(searchRequest, new BytesArray(query), remoteVersion);
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
// Invalid XContent fails
RuntimeException e = expectThrows(RuntimeException.class,
() -> initialSearchEntity(searchRequest, new BytesArray("{}, \"trailing\": {}")));
() -> initialSearchEntity(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))"));
e = expectThrows(RuntimeException.class, () -> initialSearchEntity(searchRequest, new BytesArray("{")));
e = expectThrows(RuntimeException.class, () -> initialSearchEntity(searchRequest, new BytesArray("{"), remoteVersion));
assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input"));
}
public void testScrollParams() {
Version remoteVersion = Version.fromId(between(0, Version.CURRENT.id));
TimeValue scroll = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test");
assertEquals(scroll, TimeValue.parseTimeValue(scrollParams(scroll).get("scroll"), "scroll"));
assertScroll(remoteVersion, scrollParams(scroll, remoteVersion), scroll);
}
public void testScrollEntity() throws IOException {

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
description = """\
Tests reindex-from-remote against some specific versions of
Elasticsearch prior to 5.0. Versions of Elasticsearch >= 5.0
should be able to use the standard launching mechanism which
is more flexible and reliable.
"""
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
integTestCluster {
// Whitelist reindexing from the local node so we can test it.
setting 'reindex.remote.whitelist', '127.0.0.1:*'
}
configurations {
oldesFixture
es2
es1
es090
}
dependencies {
oldesFixture project(':test:fixtures:old-elasticsearch')
/* Right now we just test against the latest version of each major we expect
* reindex-from-remote to work against. We could randomize the versions but
* that doesn't seem worth it at this point. */
es2 'org.elasticsearch.distribution.zip:elasticsearch:2.4.5@zip'
es1 'org.elasticsearch:elasticsearch:1.7.6@zip'
es090 'org.elasticsearch:elasticsearch:0.90.13@zip'
}
/* Set up tasks to unzip and run the old versions of ES before running the
* integration tests. */
for (String version : ['2', '1', '090']) {
Task unzip = task("unzipEs${version}", type: Sync) {
Configuration oldEsDependency = configurations['es' + version]
dependsOn oldEsDependency
// Use a closure here to delay resolution of the dependency until we need it
from {
oldEsDependency.collect { zipTree(it) }
}
into temporaryDir
}
Task fixture = task("oldEs${version}Fixture",
type: org.elasticsearch.gradle.test.AntFixture) {
dependsOn project.configurations.oldesFixture
dependsOn unzip
executable = new File(project.javaHome, 'bin/java')
env 'CLASSPATH', "${ -> project.configurations.oldesFixture.asPath }"
args 'oldes.OldElasticsearch',
baseDir,
unzip.temporaryDir,
version == '090'
}
integTestCluster.dependsOn fixture
integTestRunner {
/* Use a closure on the string to delay evaluation until right before we
* run the integration tests so that we can be sure that the file is ready.
*/
systemProperty "es${version}.port", "${ -> fixture.addressAndPort }"
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.smoketest;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.containsString;
public class ReindexFromOldRemoteIT extends ESRestTestCase {
private void oldEsTestCase(String portPropertyName, String requestsPerSecond) throws IOException {
int oldEsPort = Integer.parseInt(System.getProperty(portPropertyName));
try (RestClient oldEs = RestClient.builder(new HttpHost("127.0.0.1", oldEsPort)).build()) {
try {
HttpEntity entity = new StringEntity("{\"settings\":{\"number_of_shards\": 1}}", ContentType.APPLICATION_JSON);
oldEs.performRequest("PUT", "/test", singletonMap("refresh", "true"), entity);
entity = new StringEntity("{\"test\":\"test\"}", ContentType.APPLICATION_JSON);
oldEs.performRequest("PUT", "/test/doc/testdoc1", singletonMap("refresh", "true"), entity);
oldEs.performRequest("PUT", "/test/doc/testdoc2", singletonMap("refresh", "true"), entity);
oldEs.performRequest("PUT", "/test/doc/testdoc3", singletonMap("refresh", "true"), entity);
oldEs.performRequest("PUT", "/test/doc/testdoc4", singletonMap("refresh", "true"), entity);
oldEs.performRequest("PUT", "/test/doc/testdoc5", singletonMap("refresh", "true"), entity);
entity = new StringEntity(
"{\n"
+ " \"source\":{\n"
+ " \"index\": \"test\",\n"
+ " \"size\": 1,\n"
+ " \"remote\": {\n"
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
+ " }\n"
+ " },\n"
+ " \"dest\": {\n"
+ " \"index\": \"test\"\n"
+ " }\n"
+ "}",
ContentType.APPLICATION_JSON);
Map<String, String> params = new TreeMap<>();
params.put("refresh", "true");
params.put("pretty", "true");
if (requestsPerSecond != null) {
params.put("requests_per_second", requestsPerSecond);
}
client().performRequest("POST", "/_reindex", params, entity);
Response response = client().performRequest("POST", "test/_search", singletonMap("pretty", "true"));
String result = EntityUtils.toString(response.getEntity());
assertThat(result, containsString("\"_id\" : \"testdoc1\""));
} finally {
oldEs.performRequest("DELETE", "/test");
}
}
}
public void testEs2() throws IOException {
oldEsTestCase("es2.port", null);
}
public void testEs1() throws IOException {
oldEsTestCase("es1.port", null);
}
public void testEs090() throws IOException {
oldEsTestCase("es090.port", null);
}
public void testEs2WithFunnyThrottle() throws IOException {
oldEsTestCase("es2.port", "11"); // 11 requests per second should give us a nice "funny" number on the scroll timeout
}
public void testEs1WithFunnyThrottle() throws IOException {
oldEsTestCase("es1.port", "11"); // 11 requests per second should give us a nice "funny" number on the scroll timeout
}
public void testEs090WithFunnyThrottle() throws IOException {
oldEsTestCase("es090.port", "11"); // 11 requests per second should give us a nice "funny" number on the scroll timeout
}
}

View File

@ -26,6 +26,7 @@ List projects = [
'test:fixtures:example-fixture',
'test:fixtures:hdfs-fixture',
'test:fixtures:krb5kdc-fixture',
'test:fixtures:old-elasticsearch',
'test:logger-usage',
'modules:aggs-matrix-stats',
'modules:analysis-common',
@ -63,6 +64,7 @@ List projects = [
'qa:evil-tests',
'qa:multi-cluster-search',
'qa:no-bootstrap-tests',
'qa:reindex-from-old',
'qa:rolling-upgrade',
'qa:smoke-test-client',
'qa:smoke-test-http',

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
description = """\
Launches versions of Elasticsearch prior to 5.0 for testing.
These need special handling because they do not support writing
a "ports" file with the port on which Elasticsearch is running.
"""
apply plugin: 'elasticsearch.build'
test.enabled = false
dependencies {
// Just for the constants....
compile "org.apache.lucene:lucene-core:${versions.lucene}"
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package oldes;
import org.apache.lucene.util.Constants;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Starts a version of Elasticsearch that has been unzipped into an empty directory,
* instructing it to ask the OS for an unused port, grepping the logs for the port
* it actually got, and writing a {@code ports} file with the port. This is only
* required for versions of Elasticsearch before 5.0 because they do not support
* writing a "ports" file.
*/
public class OldElasticsearch {
public static void main(String[] args) throws IOException {
Path baseDir = Paths.get(args[0]);
Path unzipDir = Paths.get(args[1]);
// 0.90 must be explicitly foregrounded
boolean explicitlyForeground;
switch (args[2]) {
case "true":
explicitlyForeground = true;
break;
case "false":
explicitlyForeground = false;
break;
default:
System.err.println("the third argument must be true or false");
System.exit(1);
return;
}
Iterator<Path> children = Files.list(unzipDir).iterator();
if (false == children.hasNext()) {
System.err.println("expected the es directory to contain a single child directory but contained none.");
System.exit(1);
}
Path esDir = children.next();
if (children.hasNext()) {
System.err.println("expected the es directory to contains a single child directory but contained [" + esDir + "] and ["
+ children.next() + "].");
System.exit(1);
}
if (false == Files.isDirectory(esDir)) {
System.err.println("expected the es directory to contains a single child directory but contained a single child file.");
System.exit(1);
}
Path bin = esDir.resolve("bin").resolve("elasticsearch" + (Constants.WINDOWS ? ".bat" : ""));
Path config = esDir.resolve("config").resolve("elasticsearch.yml");
Files.write(config, Arrays.asList("http.port: 0", "transport.tcp.port: 0", "network.host: 127.0.0.1"), StandardCharsets.UTF_8);
List<String> command = new ArrayList<>();
command.add(bin.toString());
if (explicitlyForeground) {
command.add("-f");
}
command.add("-p");
command.add("../pid");
ProcessBuilder subprocess = new ProcessBuilder(command);
Process process = subprocess.start();
System.out.println("Running " + command);
int pid = 0;
int port = 0;
Pattern pidPattern = Pattern.compile("pid\\[(\\d+)\\]");
Pattern httpPortPattern = Pattern.compile("\\[http\\s+\\].+bound_address.+127\\.0\\.0\\.1:(\\d+)");
try (BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = stdout.readLine()) != null && (pid == 0 || port == 0)) {
System.out.println(line);
Matcher m = pidPattern.matcher(line);
if (m.find()) {
pid = Integer.parseInt(m.group(1));
System.out.println("Found pid: " + pid);
continue;
}
m = httpPortPattern.matcher(line);
if (m.find()) {
port = Integer.parseInt(m.group(1));
System.out.println("Found port: " + port);
continue;
}
}
}
if (port == 0) {
System.err.println("port not found");
System.exit(1);
}
Path tmp = Files.createTempFile(baseDir, null, null);
Files.write(tmp, Integer.toString(port).getBytes(StandardCharsets.UTF_8));
Files.move(tmp, baseDir.resolve("ports"), StandardCopyOption.ATOMIC_MOVE);
}
}

View File

@ -629,7 +629,7 @@ public abstract class ESTestCase extends LuceneTestCase {
return generateRandomStringArray(maxArraySize, maxStringSize, allowNull, true);
}
private static String[] TIME_SUFFIXES = new String[]{"d", "h", "ms", "s", "m"};
private static final String[] TIME_SUFFIXES = new String[]{"d", "h", "ms", "s", "m", "micros", "nanos"};
private static String randomTimeValue(int lower, int upper) {
return randomIntBetween(lower, upper) + randomFrom(TIME_SUFFIXES);