HLREST: add reindex API (#32679)

Adds the reindex API to the high level REST client.
This commit is contained in:
Sohaib Iftikhar 2018-08-28 19:02:23 +02:00 committed by Nik Everett
parent 353112a033
commit 7f5e29ddb2
27 changed files with 1644 additions and 68 deletions

View File

@ -106,6 +106,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest;
@ -820,6 +821,21 @@ final class RequestConverters {
return request;
}
static Request reindex(ReindexRequest reindexRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
.withRefresh(reindexRequest.isRefresh())
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards());
if (reindexRequest.getScrollTime() != null) {
params.putParam("scroll", reindexRequest.getScrollTime());
}
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
.addPathPart(rolloverRequest.getNewIndexName()).build();

View File

@ -64,6 +64,8 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.rankeval.RankEvalResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.plugins.spi.NamedXContentProvider;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
@ -395,6 +397,33 @@ public class RestHighLevelClient implements Closeable {
performRequestAsyncAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, listener, emptySet());
}
/**
* Executes a reindex request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
* @param reindexRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet()
);
}
/**
* Asynchronously executes a reindex request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
* @param reindexRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
performRequestAsyncAndParseEntity(
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet()
);
}
/**
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized

View File

@ -41,12 +41,16 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
@ -624,6 +628,69 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}
public void testReindex() throws IOException {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1")
.source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2")
.source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test2: create 1 and update 1
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(2, bulkResponse.getTotal());
assertEquals(1, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
}
public void testBulkProcessorIntegration() throws IOException {
int nbItems = randomIntBetween(10, 100);
boolean[] errors = new boolean[nbItems];

View File

@ -116,6 +116,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.RandomCreateIndexGenerator;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilder;
@ -126,6 +127,8 @@ import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.rankeval.RankEvalSpec;
import org.elasticsearch.index.rankeval.RatedRequest;
import org.elasticsearch.index.rankeval.RestRankEvalAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
@ -172,6 +175,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
import static org.elasticsearch.client.RequestConverters.enforceSameContentType;
@ -179,6 +183,7 @@ import static org.elasticsearch.index.RandomCreateIndexGenerator.randomAliases;
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomCreateIndexRequest;
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomIndexSettings;
import static org.elasticsearch.index.alias.RandomAliasActionsGenerator.randomAliasAction;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.CoreMatchers.equalTo;
@ -407,6 +412,64 @@ public class RequestConvertersTests extends ESTestCase {
assertToXContentBody(indicesAliasesRequest, request.getEntity());
}
public void testReindex() throws IOException {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("source_idx");
reindexRequest.setDestIndex("dest_idx");
Map<String, String> expectedParams = new HashMap<>();
if (randomBoolean()) {
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
RemoteInfo remoteInfo = new RemoteInfo("http", "remote-host", 9200, null,
BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)),
"user",
"pass",
emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
RemoteInfo.DEFAULT_CONNECT_TIMEOUT
);
reindexRequest.setRemoteInfo(remoteInfo);
}
if (randomBoolean()) {
reindexRequest.setSourceDocTypes("doc", "tweet");
}
if (randomBoolean()) {
reindexRequest.setSourceBatchSize(randomInt(100));
}
if (randomBoolean()) {
reindexRequest.setDestDocType("tweet_and_doc");
}
if (randomBoolean()) {
reindexRequest.setDestOpType("create");
}
if (randomBoolean()) {
reindexRequest.setDestPipeline("my_pipeline");
}
if (randomBoolean()) {
reindexRequest.setDestRouting("=cat");
}
if (randomBoolean()) {
reindexRequest.setSize(randomIntBetween(100, 1000));
}
if (randomBoolean()) {
reindexRequest.setAbortOnVersionConflict(false);
}
if (randomBoolean()) {
String ts = randomTimeValue();
reindexRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
}
if (reindexRequest.getRemoteInfo() == null && randomBoolean()) {
reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval"));
}
setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams);
expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep());
Request request = RequestConverters.reindex(reindexRequest);
assertEquals("/_reindex", request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(reindexRequest, request.getEntity());
}
public void testPutMapping() throws IOException {
PutMappingRequest putMappingRequest = new PutMappingRequest();

View File

@ -660,7 +660,6 @@ public class RestHighLevelClientTests extends ESTestCase {
"indices.put_alias",
"mtermvectors",
"put_script",
"reindex",
"reindex_rethrottle",
"render_search_template",
"scripts_painless_execute",

View File

@ -50,6 +50,8 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -59,13 +61,22 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -750,6 +761,144 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
public void testReindex() throws Exception {
RestHighLevelClient client = highLevelClient();
{
String mapping =
"\"doc\": {\n" +
" \"properties\": {\n" +
" \"user\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"field1\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"field2\": {\n" +
" \"type\": \"integer\"\n" +
" }\n" +
" }\n" +
" }";
createIndex("source1", Settings.EMPTY, mapping);
createIndex("source2", Settings.EMPTY, mapping);
createPipeline("my_pipeline");
}
{
// tag::reindex-request
ReindexRequest request = new ReindexRequest(); // <1>
request.setSourceIndices("source1", "source2"); // <2>
request.setDestIndex("dest"); // <3>
// end::reindex-request
// tag::reindex-request-versionType
request.setDestVersionType(VersionType.EXTERNAL); // <1>
// end::reindex-request-versionType
// tag::reindex-request-opType
request.setDestOpType("create"); // <1>
// end::reindex-request-opType
// tag::reindex-request-conflicts
request.setConflicts("proceed"); // <1>
// end::reindex-request-conflicts
// tag::reindex-request-typeOrQuery
request.setSourceDocTypes("doc"); // <1>
request.setSourceQuery(new TermQueryBuilder("user", "kimchy")); // <2>
// end::reindex-request-typeOrQuery
// tag::reindex-request-size
request.setSize(10); // <1>
// end::reindex-request-size
// tag::reindex-request-sourceSize
request.setSourceBatchSize(100); // <1>
// end::reindex-request-sourceSize
// tag::reindex-request-pipeline
request.setDestPipeline("my_pipeline"); // <1>
// end::reindex-request-pipeline
// tag::reindex-request-sort
request.addSortField("field1", SortOrder.DESC); // <1>
request.addSortField("field2", SortOrder.ASC); // <2>
// end::reindex-request-sort
// tag::reindex-request-script
request.setScript(
new Script(
ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap())); // <1>
// end::reindex-request-script
// tag::reindex-request-remote
request.setRemoteInfo(
new RemoteInfo(
"https", "localhost", 9002, null, new BytesArray(new MatchAllQueryBuilder().toString()),
"user", "pass", Collections.emptyMap(), new TimeValue(100, TimeUnit.MILLISECONDS),
new TimeValue(100, TimeUnit.SECONDS)
)
); // <1>
// end::reindex-request-remote
request.setRemoteInfo(null); // Remove it for tests
// tag::reindex-request-timeout
request.setTimeout(TimeValue.timeValueMinutes(2)); // <1>
// end::reindex-request-timeout
// tag::reindex-request-refresh
request.setRefresh(true); // <1>
// end::reindex-request-refresh
// tag::reindex-request-slices
request.setSlices(2); // <1>
// end::reindex-request-slices
// tag::reindex-request-scroll
request.setScroll(TimeValue.timeValueMinutes(10)); // <1>
// end::reindex-request-scroll
// tag::reindex-execute
BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
// end::reindex-execute
assertSame(0, bulkResponse.getSearchFailures().size());
assertSame(0, bulkResponse.getBulkFailures().size());
// tag::reindex-response
TimeValue timeTaken = bulkResponse.getTook(); // <1>
boolean timedOut = bulkResponse.isTimedOut(); // <2>
long totalDocs = bulkResponse.getTotal(); // <3>
long updatedDocs = bulkResponse.getUpdated(); // <4>
long createdDocs = bulkResponse.getCreated(); // <5>
long deletedDocs = bulkResponse.getDeleted(); // <6>
long batches = bulkResponse.getBatches(); // <7>
long noops = bulkResponse.getNoops(); // <8>
long versionConflicts = bulkResponse.getVersionConflicts(); // <9>
long bulkRetries = bulkResponse.getBulkRetries(); // <10>
long searchRetries = bulkResponse.getSearchRetries(); // <11>
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <12>
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <13>
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); // <14>
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures(); // <15>
// end::reindex-response
}
{
ReindexRequest request = new ReindexRequest();
request.setSourceIndices("source1");
request.setDestIndex("dest");
// tag::reindex-execute-listener
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::reindex-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::reindex-execute-async
client.reindexAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::reindex-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testGet() throws Exception {
RestHighLevelClient client = highLevelClient();
{

View File

@ -0,0 +1,215 @@
[[java-rest-high-document-reindex]]
=== Reindex API
[[java-rest-high-document-reindex-request]]
==== Reindex Request
A `ReindexRequest` can be used to copy documents from one or more indexes into a destination index.
It requires an existing source index and a target index which may or may not exist pre-request. Reindex does not attempt
to set up the destination index. It does not copy the settings of the source index. You should set up the destination
index prior to running a _reindex action, including setting up mappings, shard counts, replicas, etc.
The simplest form of a `ReindexRequest` looks like follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request]
--------------------------------------------------
<1> Creates the `ReindexRequest`
<2> Adds a list of sources to copy from
<3> Adds the destination index
The `dest` element can be configured like the index API to control optimistic concurrency control. Just leaving out
`versionType` (as above) or setting it to internal will cause Elasticsearch to blindly dump documents into the target.
Setting `versionType` to external will cause Elasticsearch to preserve the version from the source, create any documents
that are missing, and update any documents that have an older version in the destination index than they do in the
source index.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-versionType]
--------------------------------------------------
<1> Set the versionType to `EXTERNAL`
Setting `opType` to `create` will cause `_reindex` to only create missing documents in the target index. All existing
documents will cause a version conflict. The default `opType` is `index`.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-opType]
--------------------------------------------------
<1> Set the opType to `create`
By default version conflicts abort the `_reindex` process but you can just count them by settings it to `proceed`
in the request body
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-conflicts]
--------------------------------------------------
<1> Set `proceed` on version conflict
You can limit the documents by adding a type to the source or by adding a query.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-typeOrQuery]
--------------------------------------------------
<1> Only copy `doc` type
<2> Only copy documents which have field `user` set to `kimchy`
Its also possible to limit the number of processed documents by setting size.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-size]
--------------------------------------------------
<1> Only copy 10 documents
By default `_reindex` uses batches of 1000. You can change the batch size with `sourceBatchSize`.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sourceSize]
--------------------------------------------------
<1> Use batches of 100 documents
Reindex can also use the ingest feature by specifying a `pipeline`.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-pipeline]
--------------------------------------------------
<1> set pipeline to `my_pipeline`
If you want a particular set of documents from the source index youll need to use sort. If possible, prefer a more
selective query to size and sort.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sort]
--------------------------------------------------
<1> add descending sort to`field1`
<2> add ascending sort to `field2`
`ReindexRequest` also supports a `script` that modifies the document. It allows you to also change the document's
metadata. The following example illustrates that.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-script]
--------------------------------------------------
<1> `setScript` to increment the `likes` field on all documents with user `kimchy`.
`ReindexRequest` supports reindexing from a remote Elasticsearch cluster. When using a remote cluster the query should be
specified inside the `RemoteInfo` object and not using `setSourceQuery`. If both the remote info and the source query are
set it results in a validation error during the request. The reason for this is that the remote Elasticsearch may not
understand queries built by the modern query builders. The remote cluster support works all the way back to Elasticsearch
0.90 and the query language has changed since then. When reaching older versions, it is safer to write the query by hand
in JSON.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-remote]
--------------------------------------------------
<1> set remote elastic cluster
`ReindexRequest` also helps in automatically parallelizing using `sliced-scroll` to
slice on `_uid`. Use `setSlices` to specify the number of slices to use.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-slices]
--------------------------------------------------
<1> set number of slices to use
`ReindexRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-scroll]
--------------------------------------------------
<1> set scroll time
==== Optional arguments
In addition to the options above the following arguments can optionally be also provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-timeout]
--------------------------------------------------
<1> Timeout to wait for the reindex request to be performed as a `TimeValue`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-refresh]
--------------------------------------------------
<1> Refresh index after calling reindex
[[java-rest-high-document-reindex-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute]
--------------------------------------------------
[[java-rest-high-document-reindex-async]]
==== Asynchronous Execution
The asynchronous execution of a reindex request requires both the `ReindexRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-async]
--------------------------------------------------
<1> The `ReindexRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `BulkByScrollResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument and contains a list of individual results for each
operation that was executed. Note that one or more operations might have
failed while the others have been successfully executed.
<2> Called when the whole `ReindexRequest` fails. In this case the raised
exception is provided as an argument and no operation has been executed.
[[java-rest-high-document-reindex-response]]
==== Reindex Response
The returned `BulkByScrollResponse` contains information about the executed operations and
allows to iterate over each result as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-response]
--------------------------------------------------
<1> Get total time taken
<2> Check if the request timed out
<3> Get total number of docs processed
<4> Number of docs that were updated
<5> Number of docs that were created
<6> Number of docs that were deleted
<7> Number of batches that were executed
<8> Number of skipped docs
<9> Number of version conflicts
<10> Number of times request had to retry bulk index operations
<11> Number of times request had to retry search operations
<12> The total time this request has throttled itself not including the current throttle time if it is currently sleeping
<13> Remaining delay of any current throttle sleep or 0 if not sleeping
<14> Failures during search phase
<15> Failures during bulk index operation

View File

@ -15,6 +15,7 @@ Single document APIs::
Multi-document APIs::
* <<java-rest-high-document-bulk>>
* <<java-rest-high-document-multi-get>>
* <<java-rest-high-document-reindex>>
include::document/index.asciidoc[]
include::document/get.asciidoc[]
@ -23,6 +24,7 @@ include::document/delete.asciidoc[]
include::document/update.asciidoc[]
include::document/bulk.asciidoc[]
include::document/multi-get.asciidoc[]
include::document/reindex.asciidoc[]
== Search APIs

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@ -118,7 +117,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. "
+ "Specify it in the [dest] object instead.");
}
ReindexRequest internal = new ReindexRequest(new SearchRequest(), new IndexRequest());
ReindexRequest internal = new ReindexRequest();
try (XContentParser parser = request.contentParser()) {
PARSER.parse(parser, internal, null);
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.settings.Settings;
/**
@ -73,7 +72,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadat
@Override
protected ReindexRequest request() {
return new ReindexRequest(new SearchRequest(), new IndexRequest());
return new ReindexRequest();
}
private class TestAction extends TransportReindexAction.AsyncIndexBySearchAction {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.ScriptService;
@ -100,7 +99,7 @@ public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTes
@Override
protected ReindexRequest request() {
return new ReindexRequest(new SearchRequest(), new IndexRequest());
return new ReindexRequest();
}
@Override

View File

@ -19,8 +19,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -144,7 +142,7 @@ public class RestReindexActionTests extends ESTestCase {
request = BytesReference.bytes(b);
}
try (XContentParser p = createParser(JsonXContent.jsonXContent, request)) {
ReindexRequest r = new ReindexRequest(new SearchRequest(), new IndexRequest());
ReindexRequest r = new ReindexRequest();
RestReindexAction.PARSER.parse(p, r, null);
assertEquals("localhost", r.getRemoteInfo().getHost());
assertArrayEquals(new String[] {"source"}, r.getSearchRequest().indices());

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -47,7 +46,7 @@ import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
*/
public class RoundTripTests extends ESTestCase {
public void testReindexRequest() throws IOException {
ReindexRequest reindex = new ReindexRequest(new SearchRequest(), new IndexRequest());
ReindexRequest reindex = new ReindexRequest();
randomRequest(reindex);
reindex.getDestination().version(randomFrom(Versions.MATCH_ANY, Versions.MATCH_DELETED, 12L, 1L, 123124L, 12L));
reindex.getDestination().index("test");

View File

@ -28,11 +28,13 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -42,6 +44,8 @@ import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
@ -161,11 +165,11 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
* Represents a failure.
*/
public static class Failure implements Writeable, ToXContentFragment {
static final String INDEX_FIELD = "index";
static final String TYPE_FIELD = "type";
static final String ID_FIELD = "id";
static final String CAUSE_FIELD = "cause";
static final String STATUS_FIELD = "status";
public static final String INDEX_FIELD = "index";
public static final String TYPE_FIELD = "type";
public static final String ID_FIELD = "id";
public static final String CAUSE_FIELD = "cause";
public static final String STATUS_FIELD = "status";
private final String index;
private final String type;
@ -175,6 +179,23 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
private final long seqNo;
private final boolean aborted;
public static ConstructingObjectParser<Failure, Void> PARSER =
new ConstructingObjectParser<>(
"bulk_failures",
true,
a ->
new Failure(
(String)a[0], (String)a[1], (String)a[2], (Exception)a[3], RestStatus.fromCode((int)a[4])
)
);
static {
PARSER.declareString(constructorArg(), new ParseField(INDEX_FIELD));
PARSER.declareString(constructorArg(), new ParseField(TYPE_FIELD));
PARSER.declareString(optionalConstructorArg(), new ParseField(ID_FIELD));
PARSER.declareObject(constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), new ParseField(CAUSE_FIELD));
PARSER.declareInt(constructorArg(), new ParseField(STATUS_FIELD));
}
/**
* For write failures before operation was assigned a sequence number.
*
@ -322,6 +343,10 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
return builder;
}
public static Failure fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public String toString() {
return Strings.toString(this);

View File

@ -252,6 +252,14 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
return self();
}
/**
* Timeout to wait for the shards on to be available for each bulk request?
*/
public Self setTimeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, this.timeout, getClass().getSimpleName() + ".timeout");
return self();
}
/**
* The number of shard copies that must be active before proceeding with the write.
*/

View File

@ -19,14 +19,23 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
@ -36,6 +45,7 @@ import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
/**
* Response used for actions that index many documents using a scroll request.
@ -47,6 +57,27 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
private List<ScrollableHitSource.SearchFailure> searchFailures;
private boolean timedOut;
private static final String TOOK_FIELD = "took";
private static final String TIMED_OUT_FIELD = "timed_out";
private static final String FAILURES_FIELD = "failures";
@SuppressWarnings("unchecked")
private static final ObjectParser<BulkByScrollResponseBuilder, Void> PARSER =
new ObjectParser<>(
"bulk_by_scroll_response",
true,
BulkByScrollResponseBuilder::new
);
static {
PARSER.declareLong(BulkByScrollResponseBuilder::setTook, new ParseField(TOOK_FIELD));
PARSER.declareBoolean(BulkByScrollResponseBuilder::setTimedOut, new ParseField(TIMED_OUT_FIELD));
PARSER.declareObjectArray(
BulkByScrollResponseBuilder::setFailures, (p, c) -> parseFailure(p), new ParseField(FAILURES_FIELD)
);
// since the result of BulkByScrollResponse.Status are mixed we also parse that in this
Status.declareFields(PARSER);
}
public BulkByScrollResponse() {
}
@ -87,6 +118,10 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
return status.getCreated();
}
public long getTotal() {
return status.getTotal();
}
public long getDeleted() {
return status.getDeleted();
}
@ -171,8 +206,8 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("took", took.millis());
builder.field("timed_out", timedOut);
builder.field(TOOK_FIELD, took.millis());
builder.field(TIMED_OUT_FIELD, timedOut);
status.innerXContent(builder, params);
builder.startArray("failures");
for (Failure failure: bulkFailures) {
@ -187,6 +222,80 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
return builder;
}
public static BulkByScrollResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null).buildResponse();
}
private static Object parseFailure(XContentParser parser) throws IOException {
ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
Token token;
String index = null;
String type = null;
String id = null;
Integer status = null;
Integer shardId = null;
String nodeId = null;
ElasticsearchException bulkExc = null;
ElasticsearchException searchExc = null;
while ((token = parser.nextToken()) != Token.END_OBJECT) {
ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation);
String name = parser.currentName();
token = parser.nextToken();
if (token == Token.START_ARRAY) {
parser.skipChildren();
} else if (token == Token.START_OBJECT) {
switch (name) {
case SearchFailure.REASON_FIELD:
bulkExc = ElasticsearchException.fromXContent(parser);
break;
case Failure.CAUSE_FIELD:
searchExc = ElasticsearchException.fromXContent(parser);
break;
default:
parser.skipChildren();
}
} else if (token == Token.VALUE_STRING) {
switch (name) {
// This field is the same as SearchFailure.index
case Failure.INDEX_FIELD:
index = parser.text();
break;
case Failure.TYPE_FIELD:
type = parser.text();
break;
case Failure.ID_FIELD:
id = parser.text();
break;
case SearchFailure.NODE_FIELD:
nodeId = parser.text();
break;
default:
// Do nothing
break;
}
} else if (token == Token.VALUE_NUMBER) {
switch (name) {
case Failure.STATUS_FIELD:
status = parser.intValue();
break;
case SearchFailure.SHARD_FIELD:
shardId = parser.intValue();
break;
default:
// Do nothing
break;
}
}
}
if (bulkExc != null) {
return new Failure(index, type, id, bulkExc, RestStatus.fromCode(status));
} else if (searchExc != null) {
return new SearchFailure(searchExc, index, shardId, nodeId);
} else {
throw new ElasticsearchParseException("failed to parse failures array. At least one of {reason,cause} must be present");
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.index.reindex.BulkByScrollTask.StatusBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Helps build a {@link BulkByScrollResponse}. Used by an instance of {@link ObjectParser} when parsing from XContent.
*/
class BulkByScrollResponseBuilder extends StatusBuilder {
private TimeValue took;
private BulkByScrollTask.Status status;
private List<Failure> bulkFailures = new ArrayList<>();
private List<SearchFailure> searchFailures = new ArrayList<>();
private boolean timedOut;
BulkByScrollResponseBuilder() {}
public void setTook(long took) {
setTook(new TimeValue(took, TimeUnit.MILLISECONDS));
}
public void setTook(TimeValue took) {
this.took = took;
}
public void setStatus(BulkByScrollTask.Status status) {
this.status = status;
}
public void setFailures(List<Object> failures) {
if (failures != null) {
for (Object object: failures) {
if (object instanceof Failure) {
bulkFailures.add((Failure) object);
} else if (object instanceof SearchFailure) {
searchFailures.add((SearchFailure) object);
}
}
}
}
public void setTimedOut(boolean timedOut) {
this.timedOut = timedOut;
}
public BulkByScrollResponse buildResponse() {
status = super.buildStatus();
return new BulkByScrollResponse(took, status, bulkFailures, searchFailures, timedOut);
}
}

View File

@ -21,27 +21,40 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.lang.Math.min;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
/**
* Task storing information about a currently running BulkByScroll request.
@ -188,6 +201,120 @@ public class BulkByScrollTask extends CancellableTask {
return true;
}
/**
* This class acts as a builder for {@link Status}. Once the {@link Status} object is built by calling
* {@link #buildStatus()} it is immutable. Used by an instance of {@link ObjectParser} when parsing from
* XContent.
*/
public static class StatusBuilder {
private Integer sliceId = null;
private Long total = null;
private Long updated = null;
private Long created = null;
private Long deleted = null;
private Integer batches = null;
private Long versionConflicts = null;
private Long noops = null;
private Long bulkRetries = null;
private Long searchRetries = null;
private TimeValue throttled = null;
private Float requestsPerSecond = null;
private String reasonCancelled = null;
private TimeValue throttledUntil = null;
private List<StatusOrException> sliceStatuses = emptyList();
public void setSliceId(Integer sliceId) {
this.sliceId = sliceId;
}
public void setTotal(Long total) {
this.total = total;
}
public void setUpdated(Long updated) {
this.updated = updated;
}
public void setCreated(Long created) {
this.created = created;
}
public void setDeleted(Long deleted) {
this.deleted = deleted;
}
public void setBatches(Integer batches) {
this.batches = batches;
}
public void setVersionConflicts(Long versionConflicts) {
this.versionConflicts = versionConflicts;
}
public void setNoops(Long noops) {
this.noops = noops;
}
public void setRetries(Tuple<Long, Long> retries) {
if (retries != null) {
setBulkRetries(retries.v1());
setSearchRetries(retries.v2());
}
}
public void setBulkRetries(Long bulkRetries) {
this.bulkRetries = bulkRetries;
}
public void setSearchRetries(Long searchRetries) {
this.searchRetries = searchRetries;
}
public void setThrottled(Long throttled) {
if (throttled != null) {
this.throttled = new TimeValue(throttled, TimeUnit.MILLISECONDS);
}
}
public void setRequestsPerSecond(Float requestsPerSecond) {
if (requestsPerSecond != null) {
requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond;
this.requestsPerSecond = requestsPerSecond;
}
}
public void setReasonCancelled(String reasonCancelled) {
this.reasonCancelled = reasonCancelled;
}
public void setThrottledUntil(Long throttledUntil) {
if (throttledUntil != null) {
this.throttledUntil = new TimeValue(throttledUntil, TimeUnit.MILLISECONDS);
}
}
public void setSliceStatuses(List<StatusOrException> sliceStatuses) {
if (sliceStatuses != null) {
this.sliceStatuses = sliceStatuses;
}
}
public Status buildStatus() {
if (sliceStatuses.isEmpty()) {
try {
return new Status(
sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries,
searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil
);
} catch (NullPointerException npe) {
throw new IllegalArgumentException("a required field is null when building Status");
}
} else {
return new Status(sliceStatuses, reasonCancelled);
}
}
}
public static class Status implements Task.Status, SuccessfullyProcessed {
public static final String NAME = "bulk-by-scroll";
@ -203,6 +330,76 @@ public class BulkByScrollTask extends CancellableTask {
*/
public static final String INCLUDE_UPDATED = "include_updated";
public static final String SLICE_ID_FIELD = "slice_id";
public static final String TOTAL_FIELD = "total";
public static final String UPDATED_FIELD = "updated";
public static final String CREATED_FIELD = "created";
public static final String DELETED_FIELD = "deleted";
public static final String BATCHES_FIELD = "batches";
public static final String VERSION_CONFLICTS_FIELD = "version_conflicts";
public static final String NOOPS_FIELD = "noops";
public static final String RETRIES_FIELD = "retries";
public static final String RETRIES_BULK_FIELD = "bulk";
public static final String RETRIES_SEARCH_FIELD = "search";
public static final String THROTTLED_RAW_FIELD = "throttled_millis";
public static final String THROTTLED_HR_FIELD = "throttled";
public static final String REQUESTS_PER_SEC_FIELD = "requests_per_second";
public static final String CANCELED_FIELD = "canceled";
public static final String THROTTLED_UNTIL_RAW_FIELD = "throttled_until_millis";
public static final String THROTTLED_UNTIL_HR_FIELD = "throttled_until";
public static final String SLICES_FIELD = "slices";
public static Set<String> FIELDS_SET = new HashSet<>();
static {
FIELDS_SET.add(SLICE_ID_FIELD);
FIELDS_SET.add(TOTAL_FIELD);
FIELDS_SET.add(UPDATED_FIELD);
FIELDS_SET.add(CREATED_FIELD);
FIELDS_SET.add(DELETED_FIELD);
FIELDS_SET.add(BATCHES_FIELD);
FIELDS_SET.add(VERSION_CONFLICTS_FIELD);
FIELDS_SET.add(NOOPS_FIELD);
FIELDS_SET.add(RETRIES_FIELD);
// No need for inner level fields for retries in the set of outer level fields
FIELDS_SET.add(THROTTLED_RAW_FIELD);
FIELDS_SET.add(THROTTLED_HR_FIELD);
FIELDS_SET.add(REQUESTS_PER_SEC_FIELD);
FIELDS_SET.add(CANCELED_FIELD);
FIELDS_SET.add(THROTTLED_UNTIL_RAW_FIELD);
FIELDS_SET.add(THROTTLED_UNTIL_HR_FIELD);
FIELDS_SET.add(SLICES_FIELD);
}
@SuppressWarnings("unchecked")
static ConstructingObjectParser<Tuple<Long, Long>, Void> RETRIES_PARSER = new ConstructingObjectParser<>(
"bulk_by_scroll_task_status_retries",
true,
a -> new Tuple(a[0], a[1])
);
static {
RETRIES_PARSER.declareLong(constructorArg(), new ParseField(RETRIES_BULK_FIELD));
RETRIES_PARSER.declareLong(constructorArg(), new ParseField(RETRIES_SEARCH_FIELD));
}
public static void declareFields(ObjectParser<? extends StatusBuilder, Void> parser) {
parser.declareInt(StatusBuilder::setSliceId, new ParseField(SLICE_ID_FIELD));
parser.declareLong(StatusBuilder::setTotal, new ParseField(TOTAL_FIELD));
parser.declareLong(StatusBuilder::setUpdated, new ParseField(UPDATED_FIELD));
parser.declareLong(StatusBuilder::setCreated, new ParseField(CREATED_FIELD));
parser.declareLong(StatusBuilder::setDeleted, new ParseField(DELETED_FIELD));
parser.declareInt(StatusBuilder::setBatches, new ParseField(BATCHES_FIELD));
parser.declareLong(StatusBuilder::setVersionConflicts, new ParseField(VERSION_CONFLICTS_FIELD));
parser.declareLong(StatusBuilder::setNoops, new ParseField(NOOPS_FIELD));
parser.declareObject(StatusBuilder::setRetries, RETRIES_PARSER, new ParseField(RETRIES_FIELD));
parser.declareLong(StatusBuilder::setThrottled, new ParseField(THROTTLED_RAW_FIELD));
parser.declareFloat(StatusBuilder::setRequestsPerSecond, new ParseField(REQUESTS_PER_SEC_FIELD));
parser.declareString(StatusBuilder::setReasonCancelled, new ParseField(CANCELED_FIELD));
parser.declareLong(StatusBuilder::setThrottledUntil, new ParseField(THROTTLED_UNTIL_RAW_FIELD));
parser.declareObjectArray(
StatusBuilder::setSliceStatuses, (p, c) -> StatusOrException.fromXContent(p), new ParseField(SLICES_FIELD)
);
}
private final Integer sliceId;
private final long total;
private final long updated;
@ -353,35 +550,40 @@ public class BulkByScrollTask extends CancellableTask {
return builder.endObject();
}
/**
* We need to write a manual parser for this because of {@link StatusOrException}. Since
* {@link StatusOrException#fromXContent(XContentParser)} tries to peek at a field first before deciding
* what needs to be it cannot use an {@link ObjectParser}.
*/
public XContentBuilder innerXContent(XContentBuilder builder, Params params)
throws IOException {
if (sliceId != null) {
builder.field("slice_id", sliceId);
builder.field(SLICE_ID_FIELD, sliceId);
}
builder.field("total", total);
builder.field(TOTAL_FIELD, total);
if (params.paramAsBoolean(INCLUDE_UPDATED, true)) {
builder.field("updated", updated);
builder.field(UPDATED_FIELD, updated);
}
if (params.paramAsBoolean(INCLUDE_CREATED, true)) {
builder.field("created", created);
builder.field(CREATED_FIELD, created);
}
builder.field("deleted", deleted);
builder.field("batches", batches);
builder.field("version_conflicts", versionConflicts);
builder.field("noops", noops);
builder.startObject("retries"); {
builder.field("bulk", bulkRetries);
builder.field("search", searchRetries);
builder.field(DELETED_FIELD, deleted);
builder.field(BATCHES_FIELD, batches);
builder.field(VERSION_CONFLICTS_FIELD, versionConflicts);
builder.field(NOOPS_FIELD, noops);
builder.startObject(RETRIES_FIELD); {
builder.field(RETRIES_BULK_FIELD, bulkRetries);
builder.field(RETRIES_SEARCH_FIELD, searchRetries);
}
builder.endObject();
builder.humanReadableField("throttled_millis", "throttled", throttled);
builder.field("requests_per_second", requestsPerSecond == Float.POSITIVE_INFINITY ? -1 : requestsPerSecond);
builder.humanReadableField(THROTTLED_RAW_FIELD, THROTTLED_HR_FIELD, throttled);
builder.field(REQUESTS_PER_SEC_FIELD, requestsPerSecond == Float.POSITIVE_INFINITY ? -1 : requestsPerSecond);
if (reasonCancelled != null) {
builder.field("canceled", reasonCancelled);
builder.field(CANCELED_FIELD, reasonCancelled);
}
builder.humanReadableField("throttled_until_millis", "throttled_until", throttledUntil);
builder.humanReadableField(THROTTLED_UNTIL_RAW_FIELD, THROTTLED_UNTIL_HR_FIELD, throttledUntil);
if (false == sliceStatuses.isEmpty()) {
builder.startArray("slices");
builder.startArray(SLICES_FIELD);
for (StatusOrException slice : sliceStatuses) {
if (slice == null) {
builder.nullValue();
@ -394,6 +596,114 @@ public class BulkByScrollTask extends CancellableTask {
return builder;
}
public static Status fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
if (parser.currentToken() == Token.START_OBJECT) {
token = parser.nextToken();
} else {
token = parser.nextToken();
}
ensureExpectedToken(Token.START_OBJECT, token, parser::getTokenLocation);
token = parser.nextToken();
ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation);
return innerFromXContent(parser);
}
public static Status innerFromXContent(XContentParser parser) throws IOException {
Token token = parser.currentToken();
String fieldName = parser.currentName();
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation);
Integer sliceId = null;
Long total = null;
Long updated = null;
Long created = null;
Long deleted = null;
Integer batches = null;
Long versionConflicts = null;
Long noOps = null;
Long bulkRetries = null;
Long searchRetries = null;
TimeValue throttled = null;
Float requestsPerSecond = null;
String reasonCancelled = null;
TimeValue throttledUntil = null;
List<StatusOrException> sliceStatuses = new ArrayList<>();
while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token == Token.START_OBJECT) {
if (fieldName.equals(Status.RETRIES_FIELD)) {
Tuple<Long, Long> retries =
Status.RETRIES_PARSER.parse(parser, null);
bulkRetries = retries.v1();
searchRetries = retries.v2();
} else {
parser.skipChildren();
}
} else if (token == Token.START_ARRAY) {
if (fieldName.equals(Status.SLICES_FIELD)) {
while ((token = parser.nextToken()) != Token.END_ARRAY) {
sliceStatuses.add(StatusOrException.fromXContent(parser));
}
} else {
parser.skipChildren();
}
} else { // else if it is a value
switch (fieldName) {
case Status.SLICE_ID_FIELD:
sliceId = parser.intValue();
break;
case Status.TOTAL_FIELD:
total = parser.longValue();
break;
case Status.UPDATED_FIELD:
updated = parser.longValue();
break;
case Status.CREATED_FIELD:
created = parser.longValue();
break;
case Status.DELETED_FIELD:
deleted = parser.longValue();
break;
case Status.BATCHES_FIELD:
batches = parser.intValue();
break;
case Status.VERSION_CONFLICTS_FIELD:
versionConflicts = parser.longValue();
break;
case Status.NOOPS_FIELD:
noOps = parser.longValue();
break;
case Status.THROTTLED_RAW_FIELD:
throttled = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS);
break;
case Status.REQUESTS_PER_SEC_FIELD:
requestsPerSecond = parser.floatValue();
requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond;
break;
case Status.CANCELED_FIELD:
reasonCancelled = parser.text();
break;
case Status.THROTTLED_UNTIL_RAW_FIELD:
throttledUntil = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS);
break;
default:
break;
}
}
}
if (sliceStatuses.isEmpty()) {
return
new Status(
sliceId, total, updated, created, deleted, batches, versionConflicts, noOps, bulkRetries,
searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil
);
} else {
return new Status(sliceStatuses, reasonCancelled);
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@ -520,6 +830,44 @@ public class BulkByScrollTask extends CancellableTask {
return sliceStatuses;
}
@Override
public int hashCode() {
return Objects.hash(
sliceId, total, updated, created, deleted, batches, versionConflicts, noops, searchRetries,
bulkRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil, sliceStatuses
);
}
public boolean equalsWithoutSliceStatus(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status other = (Status) o;
return
Objects.equals(sliceId, other.sliceId) &&
total == other.total &&
updated == other.updated &&
created == other.created &&
deleted == other.deleted &&
batches == other.batches &&
versionConflicts == other.versionConflicts &&
noops == other.noops &&
searchRetries == other.searchRetries &&
bulkRetries == other.bulkRetries &&
Objects.equals(throttled, other.throttled) &&
requestsPerSecond == other.requestsPerSecond &&
Objects.equals(reasonCancelled, other.reasonCancelled) &&
Objects.equals(throttledUntil, other.throttledUntil);
}
@Override
public boolean equals(Object o) {
if (equalsWithoutSliceStatus(o)) {
return Objects.equals(sliceStatuses, ((Status) o).sliceStatuses);
} else {
return false;
}
}
private int checkPositive(int value, String name) {
if (value < 0) {
throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");
@ -543,6 +891,19 @@ public class BulkByScrollTask extends CancellableTask {
private final Status status;
private final Exception exception;
public static Set<String> EXPECTED_EXCEPTION_FIELDS = new HashSet<>();
static {
EXPECTED_EXCEPTION_FIELDS.add("type");
EXPECTED_EXCEPTION_FIELDS.add("reason");
EXPECTED_EXCEPTION_FIELDS.add("caused_by");
EXPECTED_EXCEPTION_FIELDS.add("suppressed");
EXPECTED_EXCEPTION_FIELDS.add("stack_trace");
EXPECTED_EXCEPTION_FIELDS.add("header");
EXPECTED_EXCEPTION_FIELDS.add("error");
EXPECTED_EXCEPTION_FIELDS.add("root_cause");
}
public StatusOrException(Status status) {
this.status = status;
exception = null;
@ -597,6 +958,48 @@ public class BulkByScrollTask extends CancellableTask {
return builder;
}
/**
* Since {@link StatusOrException} can contain either an {@link Exception} or a {@link Status} we need to peek
* at a field first before deciding what needs to be parsed since the same object could contains either.
* The {@link #EXPECTED_EXCEPTION_FIELDS} contains the fields that are expected when the serialised object
* was an instance of exception and the {@link Status#FIELDS_SET} is the set of fields expected when the
* serialized object was an instance of Status.
*/
public static StatusOrException fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == null) {
token = parser.nextToken();
}
if (token == Token.VALUE_NULL) {
return null;
} else {
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
token = parser.nextToken();
// This loop is present only to ignore unknown tokens. It breaks as soon as we find a field
// that is allowed.
while (token != Token.END_OBJECT) {
ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation);
String fieldName = parser.currentName();
// weird way to ignore unknown tokens
if (Status.FIELDS_SET.contains(fieldName)) {
return new StatusOrException(
Status.innerFromXContent(parser)
);
} else if (EXPECTED_EXCEPTION_FIELDS.contains(fieldName)){
return new StatusOrException(ElasticsearchException.innerFromXContent(parser, false));
} else {
// Ignore unknown tokens
token = parser.nextToken();
if (token == Token.START_OBJECT || token == Token.START_ARRAY) {
parser.skipChildren();
}
token = parser.nextToken();
}
}
throw new XContentParseException("Unable to parse StatusFromException. Expected fields not found.");
}
}
@Override
public boolean equals(Object obj) {
if (obj == null) {

View File

@ -26,6 +26,11 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
@ -39,7 +44,8 @@ import static org.elasticsearch.index.VersionType.INTERNAL;
* of reasons, not least of which that scripts are allowed to change the destination request in drastic ways, including changing the index
* to which documents are written.
*/
public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequest> implements CompositeIndicesRequest {
public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequest>
implements CompositeIndicesRequest, ToXContentObject {
/**
* Prototype for index requests.
*/
@ -48,9 +54,10 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
private RemoteInfo remoteInfo;
public ReindexRequest() {
this(new SearchRequest(), new IndexRequest(), true);
}
public ReindexRequest(SearchRequest search, IndexRequest destination) {
ReindexRequest(SearchRequest search, IndexRequest destination) {
this(search, destination, true);
}
@ -121,14 +128,124 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
}
}
/**
* Set the indices which will act as the source for the ReindexRequest
*/
public ReindexRequest setSourceIndices(String... sourceIndices) {
if (sourceIndices != null) {
this.getSearchRequest().indices(sourceIndices);
}
return this;
}
/**
* Set the document types which need to be copied from the source indices
*/
public ReindexRequest setSourceDocTypes(String... docTypes) {
if (docTypes != null) {
this.getSearchRequest().types(docTypes);
}
return this;
}
/**
* Sets the scroll size for setting how many documents are to be processed in one batch during reindex
*/
public ReindexRequest setSourceBatchSize(int size) {
this.getSearchRequest().source().size(size);
return this;
}
/**
* Set the query for selecting documents from the source indices
*/
public ReindexRequest setSourceQuery(QueryBuilder queryBuilder) {
if (queryBuilder != null) {
this.getSearchRequest().source().query(queryBuilder);
}
return this;
}
/**
* Add a sort against the given field name.
*
* @param name The name of the field to sort by
* @param order The order in which to sort
*/
public ReindexRequest addSortField(String name, SortOrder order) {
this.getSearchRequest().source().sort(name, order);
return this;
}
/**
* Set the target index for the ReindexRequest
*/
public ReindexRequest setDestIndex(String destIndex) {
if (destIndex != null) {
this.getDestination().index(destIndex);
}
return this;
}
/**
* Set the document type for the destination index
*/
public ReindexRequest setDestDocType(String docType) {
this.getDestination().type(docType);
return this;
}
/**
* Set the routing to decide which shard the documents need to be routed to
*/
public ReindexRequest setDestRouting(String routing) {
this.getDestination().routing(routing);
return this;
}
/**
* Set the version type for the target index. A {@link VersionType#EXTERNAL} helps preserve the version
* if the document already existed in the target index.
*/
public ReindexRequest setDestVersionType(VersionType versionType) {
this.getDestination().versionType(versionType);
return this;
}
/**
* Allows to set the ingest pipeline for the target index.
*/
public void setDestPipeline(String pipelineName) {
this.getDestination().setPipeline(pipelineName);
}
/**
* Sets the optype on the destination index
* @param opType must be one of {create, index}
*/
public ReindexRequest setDestOpType(String opType) {
this.getDestination().opType(opType);
return this;
}
/**
* Set the {@link RemoteInfo} if the source indices are in a remote cluster.
*/
public ReindexRequest setRemoteInfo(RemoteInfo remoteInfo) {
this.remoteInfo = remoteInfo;
return this;
}
/**
* Gets the target for this reindex request in the for of an {@link IndexRequest}
*/
public IndexRequest getDestination() {
return destination;
}
public void setRemoteInfo(RemoteInfo remoteInfo) {
this.remoteInfo = remoteInfo;
}
/**
* Get the {@link RemoteInfo} if it was set for this request.
*/
public RemoteInfo getRemoteInfo() {
return remoteInfo;
}
@ -166,4 +283,51 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
}
return b.toString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
// build source
builder.startObject("source");
if (remoteInfo != null) {
builder.field("remote", remoteInfo);
}
builder.array("index", getSearchRequest().indices());
builder.array("type", getSearchRequest().types());
getSearchRequest().source().innerToXContent(builder, params);
builder.endObject();
}
{
// build destination
builder.startObject("dest");
builder.field("index", getDestination().index());
if (getDestination().type() != null) {
builder.field("type", getDestination().type());
}
if (getDestination().routing() != null) {
builder.field("routing", getDestination().routing());
}
builder.field("op_type", getDestination().opType().getLowercase());
if (getDestination().getPipeline() != null) {
builder.field("pipeline", getDestination().getPipeline());
}
builder.field("version_type", VersionType.toString(getDestination().versionType()));
builder.endObject();
}
{
// Other fields
if (getSize() != -1 || getSize() > 0) {
builder.field("size", getSize());
}
if (getScript() != null) {
builder.field("script", getScript());
}
if (isAbortOnVersionConflict() == false) {
builder.field("conflicts", "proceed");
}
}
builder.endObject();
return builder;
}
}

View File

@ -26,6 +26,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.HashMap;
@ -35,7 +37,7 @@ import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
public class RemoteInfo implements Writeable {
public class RemoteInfo implements Writeable, ToXContentObject {
/**
* Default {@link #socketTimeout} for requests that don't have one set.
*/
@ -190,4 +192,25 @@ public class RemoteInfo implements Writeable {
}
return b.toString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (username != null) {
builder.field("username", username);
}
if (password != null) {
builder.field("password", password);
}
builder.field("host", scheme + "://" + host + ":" + port +
(pathPrefix == null ? "" : "/" + pathPrefix));
if (headers.size() >0 ) {
builder.field("headers", headers);
}
builder.field("socket_timeout", socketTimeout.getStringRep());
builder.field("connect_timeout", connectTimeout.getStringRep());
builder.field("query", query);
builder.endObject();
return builder;
}
}

View File

@ -284,6 +284,11 @@ public abstract class ScrollableHitSource {
@Nullable
private final String nodeId;
public static final String INDEX_FIELD = "index";
public static final String SHARD_FIELD = "shard";
public static final String NODE_FIELD = "node";
public static final String REASON_FIELD = "reason";
public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId) {
this.index = index;
this.shardId = shardId;
@ -337,15 +342,15 @@ public abstract class ScrollableHitSource {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (index != null) {
builder.field("index", index);
builder.field(INDEX_FIELD, index);
}
if (shardId != null) {
builder.field("shard", shardId);
builder.field(SHARD_FIELD, shardId);
}
if (nodeId != null) {
builder.field("node", nodeId);
builder.field(NODE_FIELD, nodeId);
}
builder.field("reason");
builder.field(REASON_FIELD);
{
builder.startObject();
ElasticsearchException.generateThrowableXContent(builder, params, reason);

View File

@ -1150,9 +1150,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (from != -1) {
builder.field(FROM_FIELD.getPreferredName(), from);
}
@ -1290,6 +1288,13 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
if (collapse != null) {
builder.field(COLLAPSE.getPreferredName(), collapse);
}
return builder;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder, params);
builder.endObject();
return builder;
}

View File

@ -24,7 +24,8 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.List;
@ -33,8 +34,9 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.hamcrest.Matchers.containsString;
public class BulkByScrollResponseTests extends ESTestCase {
public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkByScrollResponse> {
public void testRountTrip() throws IOException {
BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()),
@ -94,4 +96,49 @@ public class BulkByScrollResponseTests extends ESTestCase {
assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage());
}
}
@Override
protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollResponse actual) {
assertEquals(expected.getTook(), actual.getTook());
BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus());
assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size());
for (int i = 0; i < expected.getBulkFailures().size(); i++) {
Failure expectedFailure = expected.getBulkFailures().get(i);
Failure actualFailure = actual.getBulkFailures().get(i);
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
assertEquals(expectedFailure.getType(), actualFailure.getType());
assertEquals(expectedFailure.getId(), actualFailure.getId());
assertThat(expectedFailure.getMessage(), containsString(actualFailure.getMessage()));
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
}
assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size());
for (int i = 0; i < expected.getSearchFailures().size(); i++) {
ScrollableHitSource.SearchFailure expectedFailure = expected.getSearchFailures().get(i);
ScrollableHitSource.SearchFailure actualFailure = actual.getSearchFailures().get(i);
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
assertEquals(expectedFailure.getShardId(), actualFailure.getShardId());
assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId());
assertThat(expectedFailure.getReason().getMessage(), containsString(actualFailure.getReason().getMessage()));
}
}
@Override
protected BulkByScrollResponse createTestInstance() {
// failures are tested separately, so we can test XContent equivalence at least when we have no failures
return
new BulkByScrollResponse(
timeValueMillis(randomNonNegativeLong()), BulkByScrollTaskStatusTests.randomStatusWithoutException(),
emptyList(), emptyList(), randomBoolean()
);
}
@Override
protected BulkByScrollResponse doParseInstance(XContentParser parser) throws IOException {
return BulkByScrollResponse.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.index.reindex.BulkByScrollTask.StatusOrException;
import java.io.IOException;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
public class BulkByScrollTaskStatusOrExceptionTests extends AbstractXContentTestCase<StatusOrException> {
@Override
protected StatusOrException createTestInstance() {
// failures are tested separately, so we can test XContent equivalence at least when we have no failures
return createTestInstanceWithoutExceptions();
}
static StatusOrException createTestInstanceWithoutExceptions() {
return new StatusOrException(BulkByScrollTaskStatusTests.randomStatusWithoutException());
}
static StatusOrException createTestInstanceWithExceptions() {
if (randomBoolean()) {
return new StatusOrException(new ElasticsearchException("test_exception"));
} else {
return new StatusOrException(BulkByScrollTaskStatusTests.randomStatus());
}
}
@Override
protected StatusOrException doParseInstance(XContentParser parser) throws IOException {
return StatusOrException.fromXContent(parser);
}
public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual) {
if (expected != null && actual != null) {
assertNotSame(expected, actual);
if (expected.getException() == null) {
BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus());
} else {
assertThat(
actual.getException().getMessage(),
containsString(expected.getException().getMessage())
);
}
} else {
// If one of them is null both of them should be null
assertSame(expected, actual);
}
}
@Override
protected void assertEqualInstances(StatusOrException expected, StatusOrException actual) {
assertEqualStatusOrException(expected, actual);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
/**
* Test parsing {@link StatusOrException} with inner failures as they don't support asserting on xcontent equivalence, given that
* exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()}
* without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end.
*/
public void testFromXContentWithFailures() throws IOException {
Supplier<StatusOrException> instanceSupplier = BulkByScrollTaskStatusOrExceptionTests::createTestInstanceWithExceptions;
//with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata,
//but that does not bother our assertions, as we only want to test that we don't break.
boolean supportsUnknownFields = true;
//exceptions are not of the same type whenever parsed back
boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
}
}

View File

@ -23,20 +23,27 @@ import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.hamcrest.Matchers;
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import static java.lang.Math.abs;
import static java.util.stream.Collectors.toList;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.hamcrest.Matchers.equalTo;
public class BulkByScrollTaskStatusTests extends ESTestCase {
public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase<BulkByScrollTask.Status> {
public void testBulkByTaskStatus() throws IOException {
BulkByScrollTask.Status status = randomStatus();
BytesStreamOutput out = new BytesStreamOutput();
@ -98,6 +105,22 @@ public class BulkByScrollTaskStatusTests extends ESTestCase {
return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null);
}
public static BulkByScrollTask.Status randomStatusWithoutException() {
if (randomBoolean()) {
return randomWorkingStatus(null);
}
boolean canHaveNullStatues = randomBoolean();
List<BulkByScrollTask.StatusOrException> statuses = IntStream.range(0, between(0, 10))
.mapToObj(i -> {
if (canHaveNullStatues && LuceneTestCase.rarely()) {
return null;
}
return new BulkByScrollTask.StatusOrException(randomWorkingStatus(i));
})
.collect(toList());
return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null);
}
private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) {
// These all should be believably small because we sum them if we have multiple workers
int total = between(0, 10000000);
@ -109,8 +132,65 @@ public class BulkByScrollTaskStatusTests extends ESTestCase {
long versionConflicts = between(0, total);
long bulkRetries = between(0, 10000000);
long searchRetries = between(0, 100000);
return new BulkByScrollTask.Status(sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries,
searchRetries, parseTimeValue(randomPositiveTimeValue(), "test"), abs(Randomness.get().nextFloat()),
randomBoolean() ? null : randomSimpleString(Randomness.get()), parseTimeValue(randomPositiveTimeValue(), "test"));
// smallest unit of time during toXContent is Milliseconds
TimeUnit[] timeUnits = {TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS};
TimeValue throttled = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits));
TimeValue throttledUntil = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits));
return
new BulkByScrollTask.Status(
sliceId, total, updated, created, deleted, batches, versionConflicts, noops,
bulkRetries, searchRetries, throttled, abs(Randomness.get().nextFloat()),
randomBoolean() ? null : randomSimpleString(Randomness.get()), throttledUntil
);
}
public static void assertEqualStatus(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
assertNotSame(expected, actual);
assertTrue(expected.equalsWithoutSliceStatus(actual));
assertThat(expected.getSliceStatuses().size(), equalTo(actual.getSliceStatuses().size()));
for (int i = 0; i< expected.getSliceStatuses().size(); i++) {
BulkByScrollTaskStatusOrExceptionTests.assertEqualStatusOrException(
expected.getSliceStatuses().get(i),
actual.getSliceStatuses().get(i)
);
}
}
@Override
protected void assertEqualInstances(BulkByScrollTask.Status first, BulkByScrollTask.Status second) {
assertEqualStatus(first, second);
}
@Override
protected BulkByScrollTask.Status createTestInstance() {
// failures are tested separately, so we can test xcontent equivalence at least when we have no failures
return randomStatusWithoutException();
}
@Override
protected BulkByScrollTask.Status doParseInstance(XContentParser parser) throws IOException {
return BulkByScrollTask.Status.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
/**
* Test parsing {@link Status} with inner failures as they don't support asserting on xcontent equivalence, given that
* exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()}
* without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end.
*/
public void testFromXContentWithFailures() throws IOException {
Supplier<Status> instanceSupplier = BulkByScrollTaskStatusTests::randomStatus;
//with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata,
//but that does not bother our assertions, as we only want to test that we don't break.
boolean supportsUnknownFields = true;
//exceptions are not of the same type whenever parsed back
boolean assertToXContentEquivalence = false;
AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY,
getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
}
}

View File

@ -20,8 +20,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.search.slice.SliceBuilder;
@ -89,9 +87,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
@Override
protected ReindexRequest newRequest() {
ReindexRequest reindex = new ReindexRequest(new SearchRequest(), new IndexRequest());
reindex.getSearchRequest().indices("source");
reindex.getDestination().index("dest");
ReindexRequest reindex = new ReindexRequest();
reindex.setSourceIndices("source");
reindex.setDestIndex("dest");
return reindex;
}
}

View File

@ -6,8 +6,6 @@
package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
@ -116,10 +114,10 @@ public class InternalIndexReindexer<T> {
private void reindex(ParentTaskAssigningClient parentAwareClient, String index, String newIndex,
ActionListener<BulkByScrollResponse> listener) {
SearchRequest sourceRequest = new SearchRequest(index);
sourceRequest.types(types);
IndexRequest destinationRequest = new IndexRequest(newIndex);
ReindexRequest reindexRequest = new ReindexRequest(sourceRequest, destinationRequest);
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(index);
reindexRequest.setSourceDocTypes(types);
reindexRequest.setDestIndex(newIndex);
reindexRequest.setRefresh(true);
reindexRequest.setScript(transformScript);
parentAwareClient.execute(ReindexAction.INSTANCE, reindexRequest, listener);