Fix reindex with a remote source on a version before 2.0.0 (#23805)

Send the scroll id in the body as plain text when the remote version is before 2.0.0
This commit is contained in:
Jim Ferenczi 2017-03-31 09:07:43 +02:00 committed by GitHub
parent 9abb125417
commit f3a925fdbe
3 changed files with 32 additions and 5 deletions

View File

@ -171,7 +171,11 @@ final class RemoteRequestBuilders {
return singletonMap("scroll", keepAlive.toString());
}
static HttpEntity scrollEntity(String scroll) {
static HttpEntity scrollEntity(String scroll, Version remoteVersion) {
if (remoteVersion.before(Version.V_2_0_0)) {
// Versions before 2.0.0 extract the plain scroll_id from the body
return new StringEntity(scroll, ContentType.TEXT_PLAIN);
}
try (XContentBuilder entity = JsonXContent.contentBuilder()) {
return new StringEntity(entity.startObject()
.field("scroll_id", scroll)
@ -181,7 +185,11 @@ final class RemoteRequestBuilders {
}
}
static HttpEntity clearScrollEntity(String scroll) {
static HttpEntity clearScrollEntity(String scroll, Version remoteVersion) {
if (remoteVersion.before(Version.V_2_0_0)) {
// Versions before 2.0.0 extract the plain scroll_id from the body
return new StringEntity(scroll, ContentType.TEXT_PLAIN);
}
try (XContentBuilder entity = JsonXContent.contentBuilder()) {
return new StringEntity(entity.startObject()
.array("scroll_id", scroll)

View File

@ -107,12 +107,12 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse) {
execute("POST", scrollPath(), scrollParams(timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos())),
scrollEntity(scrollId), RESPONSE_PARSER, onResponse);
scrollEntity(scrollId, remoteVersion), RESPONSE_PARSER, onResponse);
}
@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
client.performRequestAsync("DELETE", scrollPath(), emptyMap(), clearScrollEntity(scrollId), new ResponseListener() {
client.performRequestAsync("DELETE", scrollPath(), emptyMap(), clearScrollEntity(scrollId, remoteVersion), new ResponseListener() {
@Override
public void onSuccess(org.elasticsearch.client.Response response) {
logger.debug("Successfully cleared [{}]", scrollId);

View File

@ -39,6 +39,7 @@ import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initi
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
@ -185,9 +186,27 @@ public class RemoteRequestBuildersTests extends ESTestCase {
public void testScrollEntity() throws IOException {
String scroll = randomAsciiOfLength(30);
HttpEntity entity = scrollEntity(scroll);
HttpEntity entity = scrollEntity(scroll, Version.V_5_0_0);
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertThat(Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)),
containsString("\"" + scroll + "\""));
// Test with version < 2.0.0
entity = scrollEntity(scroll, Version.fromId(1070499));
assertEquals(ContentType.TEXT_PLAIN.toString(), entity.getContentType().getValue());
assertEquals(scroll, Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
}
public void testClearScrollEntity() throws IOException {
String scroll = randomAsciiOfLength(30);
HttpEntity entity = clearScrollEntity(scroll, Version.V_5_0_0);
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertThat(Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)),
containsString("\"" + scroll + "\""));
// Test with version < 2.0.0
entity = clearScrollEntity(scroll, Version.fromId(1070499));
assertEquals(ContentType.TEXT_PLAIN.toString(), entity.getContentType().getValue());
assertEquals(scroll, Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
}
}