mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-24 17:09:48 +00:00
Properly serialize remote query in ReindexRequest (#43596)
This commit modifies the RemoteInfo to clarify that a search query must always be serialized as JSON. Additionally, it adds an assertion to ensure that this is the case. This fixes #43406. Additionally, this PR implements AbstractXContentTestCase for the reindex request. This is related to #43456.
This commit is contained in:
parent
51161a4b0e
commit
2fa6bc5e12
@ -22,9 +22,11 @@ package org.elasticsearch.index.reindex;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilderTestCase;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -38,9 +40,12 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTestCase {
|
||||
|
||||
private final BytesReference matchAll = new BytesArray(new MatchAllQueryBuilder().toString());
|
||||
|
||||
public void testBuildRestClient() throws Exception {
|
||||
for(final String path: new String[]{"", null, "/", "path"}) {
|
||||
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, new BytesArray("ignored"), null, null, emptyMap(),
|
||||
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, matchAll, null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
long taskId = randomLong();
|
||||
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||
@ -64,7 +69,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
|
||||
for (int i = 0; i < numHeaders; ++i) {
|
||||
headers.put("header" + i, Integer.toString(i));
|
||||
}
|
||||
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, new BytesArray("ignored"), null, null,
|
||||
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, matchAll, null, null,
|
||||
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
long taskId = randomLong();
|
||||
List<Thread> threads = synchronizedList(new ArrayList<>());
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
@ -37,6 +38,9 @@ import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemote
|
||||
* Tests the reindex-from-remote whitelist of remotes.
|
||||
*/
|
||||
public class ReindexFromRemoteWhitelistTests extends ESTestCase {
|
||||
|
||||
private final BytesReference query = new BytesArray("{ \"foo\" : \"bar\" }");
|
||||
|
||||
public void testLocalRequestWithoutWhitelist() {
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(emptyList()), null);
|
||||
}
|
||||
@ -49,7 +53,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
|
||||
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
|
||||
*/
|
||||
private RemoteInfo newRemoteInfo(String host, int port) {
|
||||
return new RemoteInfo(randomAlphaOfLength(5), host, port, null, new BytesArray("test"), null, null, emptyMap(),
|
||||
return new RemoteInfo(randomAlphaOfLength(5), host, port, null, query, null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
@ -63,7 +67,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
|
||||
|
||||
public void testWhitelistedByPrefix() {
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
|
||||
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, new BytesArray("test"), null, null, emptyMap(),
|
||||
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, query, null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
|
||||
newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200));
|
||||
|
@ -199,8 +199,9 @@ public class ReindexRestClientSslTests extends ESTestCase {
|
||||
}
|
||||
|
||||
private RemoteInfo getRemoteInfo() {
|
||||
return new RemoteInfo("https", server.getAddress().getHostName(), server.getAddress().getPort(), "/", new BytesArray("test"),
|
||||
"user", "password", Collections.emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
return new RemoteInfo("https", server.getAddress().getHostName(), server.getAddress().getPort(), "/",
|
||||
new BytesArray("{\"match_all\":{}}"), "user", "password", Collections.emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
|
||||
RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "use http server")
|
||||
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
@ -61,6 +62,8 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
|
||||
private static final AutoCreateIndex AUTO_CREATE_INDEX = new AutoCreateIndex(Settings.EMPTY,
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), INDEX_NAME_EXPRESSION_RESOLVER);
|
||||
|
||||
private final BytesReference query = new BytesArray("{ \"foo\" : \"bar\" }");
|
||||
|
||||
public void testObviousCases() {
|
||||
fails("target", "target");
|
||||
fails("target", "foo", "bar", "target", "baz");
|
||||
@ -106,10 +109,10 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
|
||||
|
||||
public void testRemoteInfoSkipsValidation() {
|
||||
// The index doesn't have to exist
|
||||
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
|
||||
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, query, null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "does_not_exist", "target");
|
||||
// And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
|
||||
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
|
||||
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, query, null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "target", "target");
|
||||
}
|
||||
|
||||
|
@ -51,7 +51,7 @@ public class RoundTripTests extends ESTestCase {
|
||||
reindex.getDestination().index("test");
|
||||
if (randomBoolean()) {
|
||||
int port = between(1, Integer.MAX_VALUE);
|
||||
BytesReference query = new BytesArray(randomAlphaOfLength(5));
|
||||
BytesReference query = new BytesArray("{\"match_all\":{}}");
|
||||
String username = randomBoolean() ? randomAlphaOfLength(5) : null;
|
||||
String password = username != null && randomBoolean() ? randomAlphaOfLength(5) : null;
|
||||
int headersCount = randomBoolean() ? 0 : between(1, 10);
|
||||
|
@ -19,28 +19,28 @@
|
||||
|
||||
package org.elasticsearch.index.reindex.remote;
|
||||
|
||||
import org.elasticsearch.index.reindex.RemoteInfo;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.index.reindex.RemoteInfo;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
||||
public class RemoteInfoTests extends ESTestCase {
|
||||
private RemoteInfo newRemoteInfo(String scheme, String prefixPath, String username, String password) {
|
||||
return new RemoteInfo(scheme, "testhost", 12344, prefixPath, new BytesArray("testquery"), username, password, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
return new RemoteInfo(scheme, "testhost", 12344, prefixPath,new BytesArray("{ \"foo\" : \"bar\" }"), username, password,
|
||||
emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
public void testToString() {
|
||||
assertEquals("host=testhost port=12344 query=testquery",
|
||||
assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" }",
|
||||
newRemoteInfo("http", null, null, null).toString());
|
||||
assertEquals("host=testhost port=12344 query=testquery username=testuser",
|
||||
assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser",
|
||||
newRemoteInfo("http", null, "testuser", null).toString());
|
||||
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
|
||||
assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
|
||||
newRemoteInfo("http", null, "testuser", "testpass").toString());
|
||||
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
|
||||
assertEquals("scheme=https host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
|
||||
newRemoteInfo("https", null, "testuser", "testpass").toString());
|
||||
assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query=testquery username=testuser password=<<>>",
|
||||
assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
|
||||
newRemoteInfo("https", "prxy", "testuser", "testpass").toString());
|
||||
}
|
||||
}
|
||||
|
@ -33,12 +33,10 @@ import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
@ -58,7 +56,6 @@ import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
import static org.elasticsearch.index.VersionType.INTERNAL;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
|
||||
/**
|
||||
* Request to reindex some documents from one index to another. This implements CompositeIndicesRequest but in a misleading way. Rather than
|
||||
@ -313,7 +310,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
|
||||
builder.startObject("source");
|
||||
if (remoteInfo != null) {
|
||||
builder.field("remote", remoteInfo);
|
||||
builder.rawField("query", remoteInfo.getQuery().streamInput(), builder.contentType());
|
||||
builder.rawField("query", remoteInfo.getQuery().streamInput(), RemoteInfo.QUERY_CONTENT_TYPE.type());
|
||||
}
|
||||
builder.array("index", getSearchRequest().indices());
|
||||
String[] types = getSearchRequest().types();
|
||||
@ -466,7 +463,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
|
||||
throw new IllegalArgumentException(
|
||||
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
|
||||
}
|
||||
return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
|
||||
return new RemoteInfo(scheme, host, port, pathPrefix, RemoteInfo.queryForRemote(source),
|
||||
username, password, headers, socketTimeout, connectTimeout);
|
||||
}
|
||||
|
||||
@ -505,20 +502,6 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
|
||||
return string == null ? defaultValue : parseTimeValue(string, name);
|
||||
}
|
||||
|
||||
private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
|
||||
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
|
||||
Object query = source.remove("query");
|
||||
if (query == null) {
|
||||
return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
|
||||
}
|
||||
if (!(query instanceof Map)) {
|
||||
throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>) query;
|
||||
return BytesReference.bytes(builder.map(map));
|
||||
}
|
||||
|
||||
static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest<?> request, int maxDocs) {
|
||||
if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) {
|
||||
throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" +
|
||||
|
@ -26,16 +26,24 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
|
||||
public class RemoteInfo implements Writeable, ToXContentObject {
|
||||
/**
|
||||
@ -47,6 +55,8 @@ public class RemoteInfo implements Writeable, ToXContentObject {
|
||||
*/
|
||||
public static final TimeValue DEFAULT_CONNECT_TIMEOUT = timeValueSeconds(30);
|
||||
|
||||
public static final XContent QUERY_CONTENT_TYPE = JsonXContent.jsonXContent;
|
||||
|
||||
private final String scheme;
|
||||
private final String host;
|
||||
private final int port;
|
||||
@ -66,6 +76,7 @@ public class RemoteInfo implements Writeable, ToXContentObject {
|
||||
|
||||
public RemoteInfo(String scheme, String host, int port, String pathPrefix, BytesReference query, String username, String password,
|
||||
Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
|
||||
assert isQueryJson(query) : "Query does not appear to be JSON";
|
||||
this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
|
||||
this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
|
||||
this.port = port;
|
||||
@ -212,4 +223,50 @@ public class RemoteInfo implements Writeable, ToXContentObject {
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
RemoteInfo that = (RemoteInfo) o;
|
||||
return port == that.port &&
|
||||
Objects.equals(scheme, that.scheme) &&
|
||||
Objects.equals(host, that.host) &&
|
||||
Objects.equals(pathPrefix, that.pathPrefix) &&
|
||||
Objects.equals(query, that.query) &&
|
||||
Objects.equals(username, that.username) &&
|
||||
Objects.equals(password, that.password) &&
|
||||
Objects.equals(headers, that.headers) &&
|
||||
Objects.equals(socketTimeout, that.socketTimeout) &&
|
||||
Objects.equals(connectTimeout, that.connectTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(scheme, host, port, pathPrefix, query, username, password, headers, socketTimeout, connectTimeout);
|
||||
}
|
||||
|
||||
static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
|
||||
XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint();
|
||||
Object query = source.remove("query");
|
||||
if (query == null) {
|
||||
return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
|
||||
}
|
||||
if (!(query instanceof Map)) {
|
||||
throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>) query;
|
||||
return BytesReference.bytes(builder.map(map));
|
||||
}
|
||||
|
||||
private static boolean isQueryJson(BytesReference bytesReference) {
|
||||
try (XContentParser parser = QUERY_CONTENT_TYPE.createParser(NamedXContentRegistry.EMPTY,
|
||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytesReference.streamInput())) {
|
||||
Map<String, Object> query = parser.map();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("Could not parse JSON", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,9 @@ package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
@ -29,9 +31,12 @@ import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
/**
|
||||
* Shared superclass for testing reindex and friends. In particular it makes sure to test the slice features.
|
||||
*/
|
||||
public abstract class AbstractBulkByScrollRequestTestCase<R extends AbstractBulkByScrollRequest<R>> extends ESTestCase {
|
||||
public abstract class AbstractBulkByScrollRequestTestCase<R extends AbstractBulkByScrollRequest<R> & ToXContent>
|
||||
extends AbstractXContentTestCase<R> {
|
||||
|
||||
public void testForSlice() {
|
||||
R original = newRequest();
|
||||
extraRandomizationForSlice(original);
|
||||
original.setAbortOnVersionConflict(randomBoolean());
|
||||
original.setRefresh(randomBoolean());
|
||||
original.setTimeout(parseTimeValue(randomPositiveTimeValue(), "timeout"));
|
||||
|
@ -22,8 +22,11 @@ package org.elasticsearch.index.reindex;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
@ -124,4 +127,28 @@ public class DeleteByQueryRequestTests extends AbstractBulkByScrollRequestTestCa
|
||||
|
||||
assertThat(e, is(nullValue()));
|
||||
}
|
||||
|
||||
// TODO: Implement standard to/from x-content parsing tests
|
||||
|
||||
@Override
|
||||
protected DeleteByQueryRequest createTestInstance() {
|
||||
return newRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DeleteByQueryRequest doParseInstance(XContentParser parser) throws IOException {
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) != null) {
|
||||
}
|
||||
return newRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertEqualInstances(DeleteByQueryRequest expectedInstance, DeleteByQueryRequest newInstance) {
|
||||
}
|
||||
}
|
||||
|
@ -22,12 +22,19 @@ package org.elasticsearch.index.reindex;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.slice.SliceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@ -41,12 +48,101 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
*/
|
||||
public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<ReindexRequest> {
|
||||
|
||||
private final BytesReference matchAll = new BytesArray("{ \"foo\" : \"bar\" }");
|
||||
|
||||
@Override
|
||||
protected NamedWriteableRegistry writableRegistry() {
|
||||
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
|
||||
return new NamedWriteableRegistry(searchModule.getNamedWriteables());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NamedXContentRegistry xContentRegistry() {
|
||||
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
|
||||
return new NamedXContentRegistry(searchModule.getNamedXContents());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean enableWarningsCheck() {
|
||||
// There sometimes will be a warning about specifying types in reindex requests being deprecated.
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReindexRequest createTestInstance() {
|
||||
ReindexRequest reindexRequest = new ReindexRequest();
|
||||
reindexRequest.setSourceIndices("source");
|
||||
reindexRequest.setDestIndex("destination");
|
||||
|
||||
if (randomBoolean()) {
|
||||
try (XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint()) {
|
||||
BytesReference query = BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
|
||||
reindexRequest.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE),
|
||||
null, query, "user", "pass", emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
reindexRequest.setSourceBatchSize(randomInt(100));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
reindexRequest.setDestDocType("type");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
reindexRequest.setDestOpType("create");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
reindexRequest.setDestPipeline("my_pipeline");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
reindexRequest.setDestRouting("=cat");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
reindexRequest.setMaxDocs(randomIntBetween(100, 1000));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
reindexRequest.setAbortOnVersionConflict(false);
|
||||
}
|
||||
|
||||
if (reindexRequest.getRemoteInfo() == null && randomBoolean()) {
|
||||
reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval"));
|
||||
}
|
||||
|
||||
return reindexRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReindexRequest doParseInstance(XContentParser parser) throws IOException {
|
||||
return ReindexRequest.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertEqualInstances(ReindexRequest expectedInstance, ReindexRequest newInstance) {
|
||||
assertNotSame(newInstance, expectedInstance);
|
||||
assertArrayEquals(expectedInstance.getSearchRequest().indices(), newInstance.getSearchRequest().indices());
|
||||
assertEquals(expectedInstance.getSearchRequest(), newInstance.getSearchRequest());
|
||||
assertEquals(expectedInstance.getMaxDocs(), newInstance.getMaxDocs());
|
||||
assertEquals(expectedInstance.getSlices(), newInstance.getSlices());
|
||||
assertEquals(expectedInstance.isAbortOnVersionConflict(), newInstance.isAbortOnVersionConflict());
|
||||
assertEquals(expectedInstance.getRemoteInfo(), newInstance.getRemoteInfo());
|
||||
assertEquals(expectedInstance.getDestination().getPipeline(), newInstance.getDestination().getPipeline());
|
||||
assertEquals(expectedInstance.getDestination().routing(), newInstance.getDestination().routing());
|
||||
assertEquals(expectedInstance.getDestination().opType(), newInstance.getDestination().opType());
|
||||
assertEquals(expectedInstance.getDestination().type(), newInstance.getDestination().type());
|
||||
}
|
||||
|
||||
public void testReindexFromRemoteDoesNotSupportSearchQuery() {
|
||||
ReindexRequest reindex = newRequest();
|
||||
reindex.setRemoteInfo(
|
||||
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), null,
|
||||
new BytesArray("real_query"), null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
matchAll, null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query
|
||||
ActionRequestValidationException e = reindex.validate();
|
||||
assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;",
|
||||
@ -57,8 +153,7 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
|
||||
ReindexRequest reindex = newRequest();
|
||||
reindex.setRemoteInfo(
|
||||
new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), null,
|
||||
new BytesArray("real_query"), null, null, emptyMap(),
|
||||
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
matchAll, null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
|
||||
reindex.setSlices(between(2, Integer.MAX_VALUE));
|
||||
ActionRequestValidationException e = reindex.validate();
|
||||
assertEquals(
|
||||
@ -80,10 +175,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
|
||||
original.setScript(mockScript(randomAlphaOfLength(5)));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
original.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, 10000),
|
||||
null, new BytesArray(randomAlphaOfLength(5)), null, null, emptyMap(),
|
||||
parseTimeValue(randomPositiveTimeValue(), "socket_timeout"),
|
||||
parseTimeValue(randomPositiveTimeValue(), "connect_timeout")));
|
||||
original.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, 10000), null, matchAll, null,
|
||||
null, emptyMap(), parseTimeValue(randomPositiveTimeValue(), "socket_timeout"),
|
||||
parseTimeValue(randomPositiveTimeValue(), "connect_timeout")));
|
||||
}
|
||||
}
|
||||
|
||||
@ -230,5 +324,4 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
|
||||
|
||||
return ReindexRequest.buildRemoteInfo(source);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,6 +20,9 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
|
||||
@ -77,4 +80,28 @@ public class UpdateByQueryRequestTests extends AbstractBulkByScrollRequestTestCa
|
||||
assertEquals(original.getScript(), forSliced.getScript());
|
||||
assertEquals(original.getPipeline(), forSliced.getPipeline());
|
||||
}
|
||||
|
||||
// TODO: Implement standard to/from x-content parsing tests
|
||||
|
||||
@Override
|
||||
protected UpdateByQueryRequest createTestInstance() {
|
||||
return newRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected UpdateByQueryRequest doParseInstance(XContentParser parser) throws IOException {
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) != null) {
|
||||
}
|
||||
return newRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertEqualInstances(UpdateByQueryRequest expectedInstance, UpdateByQueryRequest newInstance) {
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user