diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
index 9dd316a0fb0..15bbde6b5bf 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
@@ -106,6 +106,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.rankeval.RankEvalRequest;
+import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest;
@@ -820,6 +821,21 @@ final class RequestConverters {
return request;
}
+ static Request reindex(ReindexRequest reindexRequest) throws IOException {
+ String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
+ Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+ Params params = new Params(request)
+ .withRefresh(reindexRequest.isRefresh())
+ .withTimeout(reindexRequest.getTimeout())
+ .withWaitForActiveShards(reindexRequest.getWaitForActiveShards());
+
+ if (reindexRequest.getScrollTime() != null) {
+ params.putParam("scroll", reindexRequest.getScrollTime());
+ }
+ request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
+ return request;
+ }
+
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
.addPathPart(rolloverRequest.getNewIndexName()).build();
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
index c45ec048ae8..4ac5bfd080f 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
@@ -64,6 +64,8 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.rankeval.RankEvalResponse;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.plugins.spi.NamedXContentProvider;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
@@ -395,6 +397,33 @@ public class RestHighLevelClient implements Closeable {
performRequestAsyncAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, listener, emptySet());
}
+ /**
+ * Executes a reindex request.
+ * See Reindex API on elastic.co
+ * @param reindexRequest the request
+ * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the response
+ * @throws IOException in case there is a problem sending the request or parsing back the response
+ */
+ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
+ return performRequestAndParseEntity(
+ reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet()
+ );
+ }
+
+ /**
+ * Asynchronously executes a reindex request.
+ * See Reindex API on elastic.co
+ * @param reindexRequest the request
+ * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @param listener the listener to be notified upon request completion
+ */
+ public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener listener) {
+ performRequestAsyncAndParseEntity(
+ reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet()
+ );
+ }
+
/**
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java
index 89f357477fa..7978d76c56d 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java
@@ -41,12 +41,16 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
@@ -624,6 +628,69 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}
+ public void testReindex() throws IOException {
+ final String sourceIndex = "source1";
+ final String destinationIndex = "dest";
+ {
+ // Prepare
+ Settings settings = Settings.builder()
+ .put("number_of_shards", 1)
+ .put("number_of_replicas", 0)
+ .build();
+ createIndex(sourceIndex, settings);
+ createIndex(destinationIndex, settings);
+ assertEquals(
+ RestStatus.OK,
+ highLevelClient().bulk(
+ new BulkRequest()
+ .add(new IndexRequest(sourceIndex, "type", "1")
+ .source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
+ .add(new IndexRequest(sourceIndex, "type", "2")
+ .source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
+ .setRefreshPolicy(RefreshPolicy.IMMEDIATE),
+ RequestOptions.DEFAULT
+ ).status()
+ );
+ }
+ {
+ // test1: create one doc in dest
+ ReindexRequest reindexRequest = new ReindexRequest();
+ reindexRequest.setSourceIndices(sourceIndex);
+ reindexRequest.setDestIndex(destinationIndex);
+ reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
+ reindexRequest.setRefresh(true);
+ BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
+ assertEquals(1, bulkResponse.getCreated());
+ assertEquals(1, bulkResponse.getTotal());
+ assertEquals(0, bulkResponse.getDeleted());
+ assertEquals(0, bulkResponse.getNoops());
+ assertEquals(0, bulkResponse.getVersionConflicts());
+ assertEquals(1, bulkResponse.getBatches());
+ assertTrue(bulkResponse.getTook().getMillis() > 0);
+ assertEquals(1, bulkResponse.getBatches());
+ assertEquals(0, bulkResponse.getBulkFailures().size());
+ assertEquals(0, bulkResponse.getSearchFailures().size());
+ }
+ {
+ // test2: create 1 and update 1
+ ReindexRequest reindexRequest = new ReindexRequest();
+ reindexRequest.setSourceIndices(sourceIndex);
+ reindexRequest.setDestIndex(destinationIndex);
+ BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
+ assertEquals(1, bulkResponse.getCreated());
+ assertEquals(2, bulkResponse.getTotal());
+ assertEquals(1, bulkResponse.getUpdated());
+ assertEquals(0, bulkResponse.getDeleted());
+ assertEquals(0, bulkResponse.getNoops());
+ assertEquals(0, bulkResponse.getVersionConflicts());
+ assertEquals(1, bulkResponse.getBatches());
+ assertTrue(bulkResponse.getTook().getMillis() > 0);
+ assertEquals(1, bulkResponse.getBatches());
+ assertEquals(0, bulkResponse.getBulkFailures().size());
+ assertEquals(0, bulkResponse.getSearchFailures().size());
+ }
+ }
+
public void testBulkProcessorIntegration() throws IOException {
int nbItems = randomIntBetween(10, 100);
boolean[] errors = new boolean[nbItems];
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
index ebabb8f95b5..44b4ae05b57 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
@@ -116,6 +116,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.RandomCreateIndexGenerator;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilder;
@@ -126,6 +127,8 @@ import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.rankeval.RankEvalSpec;
import org.elasticsearch.index.rankeval.RatedRequest;
import org.elasticsearch.index.rankeval.RestRankEvalAction;
+import org.elasticsearch.index.reindex.ReindexRequest;
+import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
@@ -172,6 +175,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
import static org.elasticsearch.client.RequestConverters.enforceSameContentType;
@@ -179,6 +183,7 @@ import static org.elasticsearch.index.RandomCreateIndexGenerator.randomAliases;
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomCreateIndexRequest;
import static org.elasticsearch.index.RandomCreateIndexGenerator.randomIndexSettings;
import static org.elasticsearch.index.alias.RandomAliasActionsGenerator.randomAliasAction;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -407,6 +412,64 @@ public class RequestConvertersTests extends ESTestCase {
assertToXContentBody(indicesAliasesRequest, request.getEntity());
}
+ public void testReindex() throws IOException {
+ ReindexRequest reindexRequest = new ReindexRequest();
+ reindexRequest.setSourceIndices("source_idx");
+ reindexRequest.setDestIndex("dest_idx");
+ Map expectedParams = new HashMap<>();
+ if (randomBoolean()) {
+ XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
+ RemoteInfo remoteInfo = new RemoteInfo("http", "remote-host", 9200, null,
+ BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)),
+ "user",
+ "pass",
+ emptyMap(),
+ RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
+ RemoteInfo.DEFAULT_CONNECT_TIMEOUT
+ );
+ reindexRequest.setRemoteInfo(remoteInfo);
+ }
+ if (randomBoolean()) {
+ reindexRequest.setSourceDocTypes("doc", "tweet");
+ }
+ if (randomBoolean()) {
+ reindexRequest.setSourceBatchSize(randomInt(100));
+ }
+ if (randomBoolean()) {
+ reindexRequest.setDestDocType("tweet_and_doc");
+ }
+ if (randomBoolean()) {
+ reindexRequest.setDestOpType("create");
+ }
+ if (randomBoolean()) {
+ reindexRequest.setDestPipeline("my_pipeline");
+ }
+ if (randomBoolean()) {
+ reindexRequest.setDestRouting("=cat");
+ }
+ if (randomBoolean()) {
+ reindexRequest.setSize(randomIntBetween(100, 1000));
+ }
+ if (randomBoolean()) {
+ reindexRequest.setAbortOnVersionConflict(false);
+ }
+ if (randomBoolean()) {
+ String ts = randomTimeValue();
+ reindexRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
+ }
+ if (reindexRequest.getRemoteInfo() == null && randomBoolean()) {
+ reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval"));
+ }
+ setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
+ setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams);
+ expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep());
+ Request request = RequestConverters.reindex(reindexRequest);
+ assertEquals("/_reindex", request.getEndpoint());
+ assertEquals(HttpPost.METHOD_NAME, request.getMethod());
+ assertEquals(expectedParams, request.getParameters());
+ assertToXContentBody(reindexRequest, request.getEntity());
+ }
+
public void testPutMapping() throws IOException {
PutMappingRequest putMappingRequest = new PutMappingRequest();
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java
index 15f2b80b252..d2585b6f3f4 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java
@@ -660,7 +660,6 @@ public class RestHighLevelClientTests extends ESTestCase {
"indices.put_alias",
"mtermvectors",
"put_script",
- "reindex",
"reindex_rethrottle",
"render_search_template",
"scripts_painless_execute",
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
index ad41c139ddc..9c69a2a4836 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
@@ -50,6 +50,8 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@@ -59,13 +61,22 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.ReindexRequest;
+import org.elasticsearch.index.reindex.RemoteInfo;
+import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.sort.SortOrder;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -750,6 +761,144 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
+ public void testReindex() throws Exception {
+ RestHighLevelClient client = highLevelClient();
+ {
+ String mapping =
+ "\"doc\": {\n" +
+ " \"properties\": {\n" +
+ " \"user\": {\n" +
+ " \"type\": \"text\"\n" +
+ " },\n" +
+ " \"field1\": {\n" +
+ " \"type\": \"integer\"\n" +
+ " },\n" +
+ " \"field2\": {\n" +
+ " \"type\": \"integer\"\n" +
+ " }\n" +
+ " }\n" +
+ " }";
+ createIndex("source1", Settings.EMPTY, mapping);
+ createIndex("source2", Settings.EMPTY, mapping);
+ createPipeline("my_pipeline");
+ }
+ {
+ // tag::reindex-request
+ ReindexRequest request = new ReindexRequest(); // <1>
+ request.setSourceIndices("source1", "source2"); // <2>
+ request.setDestIndex("dest"); // <3>
+ // end::reindex-request
+ // tag::reindex-request-versionType
+ request.setDestVersionType(VersionType.EXTERNAL); // <1>
+ // end::reindex-request-versionType
+ // tag::reindex-request-opType
+ request.setDestOpType("create"); // <1>
+ // end::reindex-request-opType
+ // tag::reindex-request-conflicts
+ request.setConflicts("proceed"); // <1>
+ // end::reindex-request-conflicts
+ // tag::reindex-request-typeOrQuery
+ request.setSourceDocTypes("doc"); // <1>
+ request.setSourceQuery(new TermQueryBuilder("user", "kimchy")); // <2>
+ // end::reindex-request-typeOrQuery
+ // tag::reindex-request-size
+ request.setSize(10); // <1>
+ // end::reindex-request-size
+ // tag::reindex-request-sourceSize
+ request.setSourceBatchSize(100); // <1>
+ // end::reindex-request-sourceSize
+ // tag::reindex-request-pipeline
+ request.setDestPipeline("my_pipeline"); // <1>
+ // end::reindex-request-pipeline
+ // tag::reindex-request-sort
+ request.addSortField("field1", SortOrder.DESC); // <1>
+ request.addSortField("field2", SortOrder.ASC); // <2>
+ // end::reindex-request-sort
+ // tag::reindex-request-script
+ request.setScript(
+ new Script(
+ ScriptType.INLINE, "painless",
+ "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
+ Collections.emptyMap())); // <1>
+ // end::reindex-request-script
+ // tag::reindex-request-remote
+ request.setRemoteInfo(
+ new RemoteInfo(
+ "https", "localhost", 9002, null, new BytesArray(new MatchAllQueryBuilder().toString()),
+ "user", "pass", Collections.emptyMap(), new TimeValue(100, TimeUnit.MILLISECONDS),
+ new TimeValue(100, TimeUnit.SECONDS)
+ )
+ ); // <1>
+ // end::reindex-request-remote
+ request.setRemoteInfo(null); // Remove it for tests
+ // tag::reindex-request-timeout
+ request.setTimeout(TimeValue.timeValueMinutes(2)); // <1>
+ // end::reindex-request-timeout
+ // tag::reindex-request-refresh
+ request.setRefresh(true); // <1>
+ // end::reindex-request-refresh
+ // tag::reindex-request-slices
+ request.setSlices(2); // <1>
+ // end::reindex-request-slices
+ // tag::reindex-request-scroll
+ request.setScroll(TimeValue.timeValueMinutes(10)); // <1>
+ // end::reindex-request-scroll
+
+
+ // tag::reindex-execute
+ BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
+ // end::reindex-execute
+ assertSame(0, bulkResponse.getSearchFailures().size());
+ assertSame(0, bulkResponse.getBulkFailures().size());
+ // tag::reindex-response
+ TimeValue timeTaken = bulkResponse.getTook(); // <1>
+ boolean timedOut = bulkResponse.isTimedOut(); // <2>
+ long totalDocs = bulkResponse.getTotal(); // <3>
+ long updatedDocs = bulkResponse.getUpdated(); // <4>
+ long createdDocs = bulkResponse.getCreated(); // <5>
+ long deletedDocs = bulkResponse.getDeleted(); // <6>
+ long batches = bulkResponse.getBatches(); // <7>
+ long noops = bulkResponse.getNoops(); // <8>
+ long versionConflicts = bulkResponse.getVersionConflicts(); // <9>
+ long bulkRetries = bulkResponse.getBulkRetries(); // <10>
+ long searchRetries = bulkResponse.getSearchRetries(); // <11>
+ TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <12>
+ TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <13>
+ List searchFailures = bulkResponse.getSearchFailures(); // <14>
+ List bulkFailures = bulkResponse.getBulkFailures(); // <15>
+ // end::reindex-response
+ }
+ {
+ ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source1");
+ request.setDestIndex("dest");
+
+ // tag::reindex-execute-listener
+ ActionListener listener = new ActionListener() {
+ @Override
+ public void onResponse(BulkByScrollResponse bulkResponse) {
+ // <1>
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ // <2>
+ }
+ };
+ // end::reindex-execute-listener
+
+ // Replace the empty listener by a blocking listener in test
+ final CountDownLatch latch = new CountDownLatch(1);
+ listener = new LatchedActionListener<>(listener, latch);
+
+ // tag::reindex-execute-async
+ client.reindexAsync(request, RequestOptions.DEFAULT, listener); // <1>
+ // end::reindex-execute-async
+
+ assertTrue(latch.await(30L, TimeUnit.SECONDS));
+ }
+ }
+
public void testGet() throws Exception {
RestHighLevelClient client = highLevelClient();
{
diff --git a/docs/java-rest/high-level/document/reindex.asciidoc b/docs/java-rest/high-level/document/reindex.asciidoc
new file mode 100644
index 00000000000..b6d98b42dc5
--- /dev/null
+++ b/docs/java-rest/high-level/document/reindex.asciidoc
@@ -0,0 +1,215 @@
+[[java-rest-high-document-reindex]]
+=== Reindex API
+
+[[java-rest-high-document-reindex-request]]
+==== Reindex Request
+
+A `ReindexRequest` can be used to copy documents from one or more indexes into a destination index.
+
+It requires an existing source index and a target index which may or may not exist pre-request. Reindex does not attempt
+to set up the destination index. It does not copy the settings of the source index. You should set up the destination
+index prior to running a _reindex action, including setting up mappings, shard counts, replicas, etc.
+
+The simplest form of a `ReindexRequest` looks like follows:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request]
+--------------------------------------------------
+<1> Creates the `ReindexRequest`
+<2> Adds a list of sources to copy from
+<3> Adds the destination index
+
+The `dest` element can be configured like the index API to control optimistic concurrency control. Just leaving out
+`versionType` (as above) or setting it to internal will cause Elasticsearch to blindly dump documents into the target.
+Setting `versionType` to external will cause Elasticsearch to preserve the version from the source, create any documents
+that are missing, and update any documents that have an older version in the destination index than they do in the
+source index.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-versionType]
+--------------------------------------------------
+<1> Set the versionType to `EXTERNAL`
+
+Setting `opType` to `create` will cause `_reindex` to only create missing documents in the target index. All existing
+documents will cause a version conflict. The default `opType` is `index`.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-opType]
+--------------------------------------------------
+<1> Set the opType to `create`
+
+By default version conflicts abort the `_reindex` process but you can just count them by settings it to `proceed`
+in the request body
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-conflicts]
+--------------------------------------------------
+<1> Set `proceed` on version conflict
+
+You can limit the documents by adding a type to the source or by adding a query.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-typeOrQuery]
+--------------------------------------------------
+<1> Only copy `doc` type
+<2> Only copy documents which have field `user` set to `kimchy`
+
+It’s also possible to limit the number of processed documents by setting size.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-size]
+--------------------------------------------------
+<1> Only copy 10 documents
+
+By default `_reindex` uses batches of 1000. You can change the batch size with `sourceBatchSize`.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sourceSize]
+--------------------------------------------------
+<1> Use batches of 100 documents
+
+Reindex can also use the ingest feature by specifying a `pipeline`.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-pipeline]
+--------------------------------------------------
+<1> set pipeline to `my_pipeline`
+
+If you want a particular set of documents from the source index you’ll need to use sort. If possible, prefer a more
+selective query to size and sort.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sort]
+--------------------------------------------------
+<1> add descending sort to`field1`
+<2> add ascending sort to `field2`
+
+`ReindexRequest` also supports a `script` that modifies the document. It allows you to also change the document's
+metadata. The following example illustrates that.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-script]
+--------------------------------------------------
+<1> `setScript` to increment the `likes` field on all documents with user `kimchy`.
+
+`ReindexRequest` supports reindexing from a remote Elasticsearch cluster. When using a remote cluster the query should be
+specified inside the `RemoteInfo` object and not using `setSourceQuery`. If both the remote info and the source query are
+set it results in a validation error during the request. The reason for this is that the remote Elasticsearch may not
+understand queries built by the modern query builders. The remote cluster support works all the way back to Elasticsearch
+0.90 and the query language has changed since then. When reaching older versions, it is safer to write the query by hand
+in JSON.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-remote]
+--------------------------------------------------
+<1> set remote elastic cluster
+
+`ReindexRequest` also helps in automatically parallelizing using `sliced-scroll` to
+slice on `_uid`. Use `setSlices` to specify the number of slices to use.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-slices]
+--------------------------------------------------
+<1> set number of slices to use
+
+`ReindexRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive.
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-scroll]
+--------------------------------------------------
+<1> set scroll time
+
+
+==== Optional arguments
+In addition to the options above the following arguments can optionally be also provided:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-timeout]
+--------------------------------------------------
+<1> Timeout to wait for the reindex request to be performed as a `TimeValue`
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-refresh]
+--------------------------------------------------
+<1> Refresh index after calling reindex
+
+
+[[java-rest-high-document-reindex-sync]]
+==== Synchronous Execution
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute]
+--------------------------------------------------
+
+[[java-rest-high-document-reindex-async]]
+==== Asynchronous Execution
+
+The asynchronous execution of a reindex request requires both the `ReindexRequest`
+instance and an `ActionListener` instance to be passed to the asynchronous
+method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-async]
+--------------------------------------------------
+<1> The `ReindexRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `BulkByScrollResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument and contains a list of individual results for each
+operation that was executed. Note that one or more operations might have
+failed while the others have been successfully executed.
+<2> Called when the whole `ReindexRequest` fails. In this case the raised
+exception is provided as an argument and no operation has been executed.
+
+[[java-rest-high-document-reindex-response]]
+==== Reindex Response
+
+The returned `BulkByScrollResponse` contains information about the executed operations and
+ allows to iterate over each result as follows:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-response]
+--------------------------------------------------
+<1> Get total time taken
+<2> Check if the request timed out
+<3> Get total number of docs processed
+<4> Number of docs that were updated
+<5> Number of docs that were created
+<6> Number of docs that were deleted
+<7> Number of batches that were executed
+<8> Number of skipped docs
+<9> Number of version conflicts
+<10> Number of times request had to retry bulk index operations
+<11> Number of times request had to retry search operations
+<12> The total time this request has throttled itself not including the current throttle time if it is currently sleeping
+<13> Remaining delay of any current throttle sleep or 0 if not sleeping
+<14> Failures during search phase
+<15> Failures during bulk index operation
diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc
index e04e391f3e0..64c95912b5e 100644
--- a/docs/java-rest/high-level/supported-apis.asciidoc
+++ b/docs/java-rest/high-level/supported-apis.asciidoc
@@ -15,6 +15,7 @@ Single document APIs::
Multi-document APIs::
* <>
* <>
+* <>
include::document/index.asciidoc[]
include::document/get.asciidoc[]
@@ -23,6 +24,7 @@ include::document/delete.asciidoc[]
include::document/update.asciidoc[]
include::document/bulk.asciidoc[]
include::document/multi-get.asciidoc[]
+include::document/reindex.asciidoc[]
== Search APIs
diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java
index a5520c90b0f..50d01535d7f 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java
@@ -20,7 +20,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@@ -118,7 +117,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler PARSER =
+ new ConstructingObjectParser<>(
+ "bulk_failures",
+ true,
+ a ->
+ new Failure(
+ (String)a[0], (String)a[1], (String)a[2], (Exception)a[3], RestStatus.fromCode((int)a[4])
+ )
+ );
+ static {
+ PARSER.declareString(constructorArg(), new ParseField(INDEX_FIELD));
+ PARSER.declareString(constructorArg(), new ParseField(TYPE_FIELD));
+ PARSER.declareString(optionalConstructorArg(), new ParseField(ID_FIELD));
+ PARSER.declareObject(constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), new ParseField(CAUSE_FIELD));
+ PARSER.declareInt(constructorArg(), new ParseField(STATUS_FIELD));
+ }
+
/**
* For write failures before operation was assigned a sequence number.
*
@@ -322,6 +343,10 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject {
return builder;
}
+ public static Failure fromXContent(XContentParser parser) {
+ return PARSER.apply(parser, null);
+ }
+
@Override
public String toString() {
return Strings.toString(this);
diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
index 8536337bfdb..3b635c82387 100644
--- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
+++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
@@ -252,6 +252,14 @@ public abstract class AbstractBulkByScrollRequest searchFailures;
private boolean timedOut;
+ private static final String TOOK_FIELD = "took";
+ private static final String TIMED_OUT_FIELD = "timed_out";
+ private static final String FAILURES_FIELD = "failures";
+
+ @SuppressWarnings("unchecked")
+ private static final ObjectParser PARSER =
+ new ObjectParser<>(
+ "bulk_by_scroll_response",
+ true,
+ BulkByScrollResponseBuilder::new
+ );
+ static {
+ PARSER.declareLong(BulkByScrollResponseBuilder::setTook, new ParseField(TOOK_FIELD));
+ PARSER.declareBoolean(BulkByScrollResponseBuilder::setTimedOut, new ParseField(TIMED_OUT_FIELD));
+ PARSER.declareObjectArray(
+ BulkByScrollResponseBuilder::setFailures, (p, c) -> parseFailure(p), new ParseField(FAILURES_FIELD)
+ );
+ // since the result of BulkByScrollResponse.Status are mixed we also parse that in this
+ Status.declareFields(PARSER);
+ }
+
public BulkByScrollResponse() {
}
@@ -87,6 +118,10 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
return status.getCreated();
}
+ public long getTotal() {
+ return status.getTotal();
+ }
+
public long getDeleted() {
return status.getDeleted();
}
@@ -171,8 +206,8 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- builder.field("took", took.millis());
- builder.field("timed_out", timedOut);
+ builder.field(TOOK_FIELD, took.millis());
+ builder.field(TIMED_OUT_FIELD, timedOut);
status.innerXContent(builder, params);
builder.startArray("failures");
for (Failure failure: bulkFailures) {
@@ -187,6 +222,80 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr
return builder;
}
+ public static BulkByScrollResponse fromXContent(XContentParser parser) {
+ return PARSER.apply(parser, null).buildResponse();
+ }
+
+ private static Object parseFailure(XContentParser parser) throws IOException {
+ ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
+ Token token;
+ String index = null;
+ String type = null;
+ String id = null;
+ Integer status = null;
+ Integer shardId = null;
+ String nodeId = null;
+ ElasticsearchException bulkExc = null;
+ ElasticsearchException searchExc = null;
+ while ((token = parser.nextToken()) != Token.END_OBJECT) {
+ ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation);
+ String name = parser.currentName();
+ token = parser.nextToken();
+ if (token == Token.START_ARRAY) {
+ parser.skipChildren();
+ } else if (token == Token.START_OBJECT) {
+ switch (name) {
+ case SearchFailure.REASON_FIELD:
+ bulkExc = ElasticsearchException.fromXContent(parser);
+ break;
+ case Failure.CAUSE_FIELD:
+ searchExc = ElasticsearchException.fromXContent(parser);
+ break;
+ default:
+ parser.skipChildren();
+ }
+ } else if (token == Token.VALUE_STRING) {
+ switch (name) {
+ // This field is the same as SearchFailure.index
+ case Failure.INDEX_FIELD:
+ index = parser.text();
+ break;
+ case Failure.TYPE_FIELD:
+ type = parser.text();
+ break;
+ case Failure.ID_FIELD:
+ id = parser.text();
+ break;
+ case SearchFailure.NODE_FIELD:
+ nodeId = parser.text();
+ break;
+ default:
+ // Do nothing
+ break;
+ }
+ } else if (token == Token.VALUE_NUMBER) {
+ switch (name) {
+ case Failure.STATUS_FIELD:
+ status = parser.intValue();
+ break;
+ case SearchFailure.SHARD_FIELD:
+ shardId = parser.intValue();
+ break;
+ default:
+ // Do nothing
+ break;
+ }
+ }
+ }
+ if (bulkExc != null) {
+ return new Failure(index, type, id, bulkExc, RestStatus.fromCode(status));
+ } else if (searchExc != null) {
+ return new SearchFailure(searchExc, index, shardId, nodeId);
+ } else {
+ throw new ElasticsearchParseException("failed to parse failures array. At least one of {reason,cause} must be present");
+ }
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java
new file mode 100644
index 00000000000..ad5bfd6e03c
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
+import org.elasticsearch.index.reindex.BulkByScrollTask.StatusBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helps build a {@link BulkByScrollResponse}. Used by an instance of {@link ObjectParser} when parsing from XContent.
+ */
+class BulkByScrollResponseBuilder extends StatusBuilder {
+ private TimeValue took;
+ private BulkByScrollTask.Status status;
+ private List bulkFailures = new ArrayList<>();
+ private List searchFailures = new ArrayList<>();
+ private boolean timedOut;
+
+ BulkByScrollResponseBuilder() {}
+
+ public void setTook(long took) {
+ setTook(new TimeValue(took, TimeUnit.MILLISECONDS));
+ }
+
+ public void setTook(TimeValue took) {
+ this.took = took;
+ }
+
+ public void setStatus(BulkByScrollTask.Status status) {
+ this.status = status;
+ }
+
+ public void setFailures(List