Merge branch 'master' into ccr
* master: HLREST: add update by query API (#32760)
This commit is contained in:
commit
ea4eef8641
|
@ -107,7 +107,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.rankeval.RankEvalRequest;
|
||||
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
|
||||
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
|
||||
import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest;
|
||||
|
@ -837,6 +839,33 @@ final class RequestConverters {
|
|||
return request;
|
||||
}
|
||||
|
||||
static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
|
||||
String endpoint =
|
||||
endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
|
||||
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
|
||||
Params params = new Params(request)
|
||||
.withRouting(updateByQueryRequest.getRouting())
|
||||
.withPipeline(updateByQueryRequest.getPipeline())
|
||||
.withRefresh(updateByQueryRequest.isRefresh())
|
||||
.withTimeout(updateByQueryRequest.getTimeout())
|
||||
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
|
||||
.withIndicesOptions(updateByQueryRequest.indicesOptions());
|
||||
if (updateByQueryRequest.isAbortOnVersionConflict() == false) {
|
||||
params.putParam("conflicts", "proceed");
|
||||
}
|
||||
if (updateByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
|
||||
params.putParam("scroll_size", Integer.toString(updateByQueryRequest.getBatchSize()));
|
||||
}
|
||||
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
|
||||
params.putParam("scroll", updateByQueryRequest.getScrollTime());
|
||||
}
|
||||
if (updateByQueryRequest.getSize() > 0) {
|
||||
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
|
||||
}
|
||||
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
|
||||
return request;
|
||||
}
|
||||
|
||||
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
|
||||
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
|
||||
.addPathPart(rolloverRequest.getNewIndexName()).build();
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.elasticsearch.index.rankeval.RankEvalRequest;
|
|||
import org.elasticsearch.index.rankeval.RankEvalResponse;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.plugins.spi.NamedXContentProvider;
|
||||
import org.elasticsearch.rest.BytesRestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -424,6 +425,35 @@ public class RestHighLevelClient implements Closeable {
|
|||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a update by query request.
|
||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
|
||||
* Update By Query API on elastic.co</a>
|
||||
* @param updateByQueryRequest the request
|
||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||
* @return the response
|
||||
* @throws IOException in case there is a problem sending the request or parsing back the response
|
||||
*/
|
||||
public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException {
|
||||
return performRequestAndParseEntity(
|
||||
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously executes an update by query request.
|
||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
|
||||
* Update By Query API on elastic.co</a>
|
||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||
* @param listener the listener to be notified upon request completion
|
||||
*/
|
||||
public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
performRequestAsyncAndParseEntity(
|
||||
reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
|
||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.index.get.GetResult;
|
|||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
|
@ -691,6 +692,72 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testUpdateByQuery() throws IOException {
|
||||
final String sourceIndex = "source1";
|
||||
{
|
||||
// Prepare
|
||||
Settings settings = Settings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(sourceIndex, settings);
|
||||
assertEquals(
|
||||
RestStatus.OK,
|
||||
highLevelClient().bulk(
|
||||
new BulkRequest()
|
||||
.add(new IndexRequest(sourceIndex, "type", "1")
|
||||
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex, "type", "2")
|
||||
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
|
||||
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
|
||||
RequestOptions.DEFAULT
|
||||
).status()
|
||||
);
|
||||
}
|
||||
{
|
||||
// test1: create one doc in dest
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
|
||||
assertEquals(1, bulkResponse.getTotal());
|
||||
assertEquals(1, bulkResponse.getUpdated());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
}
|
||||
{
|
||||
// test2: update using script
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
|
||||
assertEquals(2, bulkResponse.getTotal());
|
||||
assertEquals(2, bulkResponse.getUpdated());
|
||||
assertEquals(0, bulkResponse.getDeleted());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
assertEquals(
|
||||
3,
|
||||
(int) (highLevelClient().get(new GetRequest(sourceIndex, "type", "2"), RequestOptions.DEFAULT)
|
||||
.getSourceAsMap().get("foo"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void testBulkProcessorIntegration() throws IOException {
|
||||
int nbItems = randomIntBetween(10, 100);
|
||||
boolean[] errors = new boolean[nbItems];
|
||||
|
|
|
@ -129,6 +129,7 @@ import org.elasticsearch.index.rankeval.RatedRequest;
|
|||
import org.elasticsearch.index.rankeval.RestRankEvalAction;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.RemoteInfo;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
|
||||
import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest;
|
||||
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
|
||||
|
@ -137,6 +138,7 @@ import org.elasticsearch.protocol.xpack.graph.Hop;
|
|||
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.rest.action.search.RestSearchAction;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
|
||||
import org.elasticsearch.script.mustache.SearchTemplateRequest;
|
||||
|
@ -470,6 +472,60 @@ public class RequestConvertersTests extends ESTestCase {
|
|||
assertToXContentBody(reindexRequest, request.getEntity());
|
||||
}
|
||||
|
||||
public void testUpdateByQuery() throws IOException {
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(randomIndicesNames(1, 5));
|
||||
Map<String, String> expectedParams = new HashMap<>();
|
||||
if (randomBoolean()) {
|
||||
updateByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int batchSize = randomInt(100);
|
||||
updateByQueryRequest.setBatchSize(batchSize);
|
||||
expectedParams.put("scroll_size", Integer.toString(batchSize));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
updateByQueryRequest.setPipeline("my_pipeline");
|
||||
expectedParams.put("pipeline", "my_pipeline");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
updateByQueryRequest.setRouting("=cat");
|
||||
expectedParams.put("routing", "=cat");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int size = randomIntBetween(100, 1000);
|
||||
updateByQueryRequest.setSize(size);
|
||||
expectedParams.put("size", Integer.toString(size));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
updateByQueryRequest.setAbortOnVersionConflict(false);
|
||||
expectedParams.put("conflicts", "proceed");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
String ts = randomTimeValue();
|
||||
updateByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
|
||||
expectedParams.put("scroll", ts);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
updateByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval"));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
updateByQueryRequest.setScript(new Script("ctx._source.last = \"lastname\""));
|
||||
}
|
||||
setRandomIndicesOptions(updateByQueryRequest::setIndicesOptions, updateByQueryRequest::indicesOptions, expectedParams);
|
||||
setRandomTimeout(updateByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
|
||||
Request request = RequestConverters.updateByQuery(updateByQueryRequest);
|
||||
StringJoiner joiner = new StringJoiner("/", "/", "");
|
||||
joiner.add(String.join(",", updateByQueryRequest.indices()));
|
||||
if (updateByQueryRequest.getDocTypes().length > 0)
|
||||
joiner.add(String.join(",", updateByQueryRequest.getDocTypes()));
|
||||
joiner.add("_update_by_query");
|
||||
assertEquals(joiner.toString(), request.getEndpoint());
|
||||
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
|
||||
assertEquals(expectedParams, request.getParameters());
|
||||
assertToXContentBody(updateByQueryRequest, request.getEntity());
|
||||
}
|
||||
|
||||
public void testPutMapping() throws IOException {
|
||||
PutMappingRequest putMappingRequest = new PutMappingRequest();
|
||||
|
||||
|
|
|
@ -664,8 +664,7 @@ public class RestHighLevelClientTests extends ESTestCase {
|
|||
"render_search_template",
|
||||
"scripts_painless_execute",
|
||||
"tasks.get",
|
||||
"termvectors",
|
||||
"update_by_query"
|
||||
"termvectors"
|
||||
};
|
||||
//These API are not required for high-level client feature completeness
|
||||
String[] notRequiredApi = new String[] {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.action.get.MultiGetResponse;
|
|||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
|
@ -67,6 +68,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.RemoteInfo;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
|
@ -899,6 +901,125 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testUpdateByQuery() throws Exception {
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
{
|
||||
String mapping =
|
||||
"\"doc\": {\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"user\": {\n" +
|
||||
" \"type\": \"text\"\n" +
|
||||
" },\n" +
|
||||
" \"field1\": {\n" +
|
||||
" \"type\": \"integer\"\n" +
|
||||
" },\n" +
|
||||
" \"field2\": {\n" +
|
||||
" \"type\": \"integer\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
createIndex("source1", Settings.EMPTY, mapping);
|
||||
createIndex("source2", Settings.EMPTY, mapping);
|
||||
createPipeline("my_pipeline");
|
||||
}
|
||||
{
|
||||
// tag::update-by-query-request
|
||||
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2"); // <1>
|
||||
// end::update-by-query-request
|
||||
// tag::update-by-query-request-conflicts
|
||||
request.setConflicts("proceed"); // <1>
|
||||
// end::update-by-query-request-conflicts
|
||||
// tag::update-by-query-request-typeOrQuery
|
||||
request.setDocTypes("doc"); // <1>
|
||||
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2>
|
||||
// end::update-by-query-request-typeOrQuery
|
||||
// tag::update-by-query-request-size
|
||||
request.setSize(10); // <1>
|
||||
// end::update-by-query-request-size
|
||||
// tag::update-by-query-request-scrollSize
|
||||
request.setBatchSize(100); // <1>
|
||||
// end::update-by-query-request-scrollSize
|
||||
// tag::update-by-query-request-pipeline
|
||||
request.setPipeline("my_pipeline"); // <1>
|
||||
// end::update-by-query-request-pipeline
|
||||
// tag::update-by-query-request-script
|
||||
request.setScript(
|
||||
new Script(
|
||||
ScriptType.INLINE, "painless",
|
||||
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
|
||||
Collections.emptyMap())); // <1>
|
||||
// end::update-by-query-request-script
|
||||
// tag::update-by-query-request-timeout
|
||||
request.setTimeout(TimeValue.timeValueMinutes(2)); // <1>
|
||||
// end::update-by-query-request-timeout
|
||||
// tag::update-by-query-request-refresh
|
||||
request.setRefresh(true); // <1>
|
||||
// end::update-by-query-request-refresh
|
||||
// tag::update-by-query-request-slices
|
||||
request.setSlices(2); // <1>
|
||||
// end::update-by-query-request-slices
|
||||
// tag::update-by-query-request-scroll
|
||||
request.setScroll(TimeValue.timeValueMinutes(10)); // <1>
|
||||
// end::update-by-query-request-scroll
|
||||
// tag::update-by-query-request-routing
|
||||
request.setRouting("=cat"); // <1>
|
||||
// end::update-by-query-request-routing
|
||||
// tag::update-by-query-request-indicesOptions
|
||||
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1>
|
||||
// end::update-by-query-request-indicesOptions
|
||||
|
||||
// tag::update-by-query-execute
|
||||
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
|
||||
// end::update-by-query-execute
|
||||
assertSame(0, bulkResponse.getSearchFailures().size());
|
||||
assertSame(0, bulkResponse.getBulkFailures().size());
|
||||
// tag::update-by-query-response
|
||||
TimeValue timeTaken = bulkResponse.getTook(); // <1>
|
||||
boolean timedOut = bulkResponse.isTimedOut(); // <2>
|
||||
long totalDocs = bulkResponse.getTotal(); // <3>
|
||||
long updatedDocs = bulkResponse.getUpdated(); // <4>
|
||||
long deletedDocs = bulkResponse.getDeleted(); // <5>
|
||||
long batches = bulkResponse.getBatches(); // <6>
|
||||
long noops = bulkResponse.getNoops(); // <7>
|
||||
long versionConflicts = bulkResponse.getVersionConflicts(); // <8>
|
||||
long bulkRetries = bulkResponse.getBulkRetries(); // <9>
|
||||
long searchRetries = bulkResponse.getSearchRetries(); // <10>
|
||||
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <11>
|
||||
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <12>
|
||||
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); // <13>
|
||||
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures(); // <14>
|
||||
// end::update-by-query-response
|
||||
}
|
||||
{
|
||||
UpdateByQueryRequest request = new UpdateByQueryRequest();
|
||||
request.indices("source1");
|
||||
|
||||
// tag::update-by-query-execute-listener
|
||||
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse bulkResponse) {
|
||||
// <1>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// <2>
|
||||
}
|
||||
};
|
||||
// end::update-by-query-execute-listener
|
||||
|
||||
// Replace the empty listener by a blocking listener in test
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
listener = new LatchedActionListener<>(listener, latch);
|
||||
|
||||
// tag::update-by-query-execute-async
|
||||
client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1>
|
||||
// end::update-by-query-execute-async
|
||||
|
||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
public void testGet() throws Exception {
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
{
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
[[java-rest-high-document-update-by-query]]
|
||||
=== Update By Query API
|
||||
|
||||
[[java-rest-high-document-update-by-query-request]]
|
||||
==== Update By Query Request
|
||||
|
||||
A `UpdateByQueryRequest` can be used to update documents in an index.
|
||||
|
||||
It requires an existing index (or a set of indices) on which the update is to be performed.
|
||||
|
||||
The simplest form of a `UpdateByQueryRequest` looks like follows:
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request]
|
||||
--------------------------------------------------
|
||||
<1> Creates the `UpdateByQueryRequest` on a set of indices.
|
||||
|
||||
By default version conflicts abort the `UpdateByQueryRequest` process but you can just count them by settings it to
|
||||
`proceed` in the request body
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-conflicts]
|
||||
--------------------------------------------------
|
||||
<1> Set `proceed` on version conflict
|
||||
|
||||
You can limit the documents by adding a type to the source or by adding a query.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-typeOrQuery]
|
||||
--------------------------------------------------
|
||||
<1> Only copy `doc` type
|
||||
<2> Only copy documents which have field `user` set to `kimchy`
|
||||
|
||||
It’s also possible to limit the number of processed documents by setting size.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-size]
|
||||
--------------------------------------------------
|
||||
<1> Only copy 10 documents
|
||||
|
||||
By default `UpdateByQueryRequest` uses batches of 1000. You can change the batch size with `setBatchSize`.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-scrollSize]
|
||||
--------------------------------------------------
|
||||
<1> Use batches of 100 documents
|
||||
|
||||
Update by query can also use the ingest feature by specifying a `pipeline`.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-pipeline]
|
||||
--------------------------------------------------
|
||||
<1> set pipeline to `my_pipeline`
|
||||
|
||||
`UpdateByQueryRequest` also supports a `script` that modifies the document. The following example illustrates that.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-script]
|
||||
--------------------------------------------------
|
||||
<1> `setScript` to increment the `likes` field on all documents with user `kimchy`.
|
||||
|
||||
`UpdateByQueryRequest` also helps in automatically parallelizing using `sliced-scroll` to
|
||||
slice on `_uid`. Use `setSlices` to specify the number of slices to use.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-slices]
|
||||
--------------------------------------------------
|
||||
<1> set number of slices to use
|
||||
|
||||
`UpdateByQueryRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-scroll]
|
||||
--------------------------------------------------
|
||||
<1> set scroll time
|
||||
|
||||
If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match
|
||||
that routing value.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-routing]
|
||||
--------------------------------------------------
|
||||
<1> set routing
|
||||
|
||||
|
||||
==== Optional arguments
|
||||
In addition to the options above the following arguments can optionally be also provided:
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-timeout]
|
||||
--------------------------------------------------
|
||||
<1> Timeout to wait for the update by query request to be performed as a `TimeValue`
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-refresh]
|
||||
--------------------------------------------------
|
||||
<1> Refresh index after calling update by query
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-indicesOptions]
|
||||
--------------------------------------------------
|
||||
<1> Set indices options
|
||||
|
||||
|
||||
[[java-rest-high-document-update-by-query-sync]]
|
||||
==== Synchronous Execution
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute]
|
||||
--------------------------------------------------
|
||||
|
||||
[[java-rest-high-document-update-by-query-async]]
|
||||
==== Asynchronous Execution
|
||||
|
||||
The asynchronous execution of an update by query request requires both the `UpdateByQueryRequest`
|
||||
instance and an `ActionListener` instance to be passed to the asynchronous
|
||||
method:
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute-async]
|
||||
--------------------------------------------------
|
||||
<1> The `UpdateByQueryRequest` to execute and the `ActionListener` to use when
|
||||
the execution completes
|
||||
|
||||
The asynchronous method does not block and returns immediately. Once it is
|
||||
completed the `ActionListener` is called back using the `onResponse` method
|
||||
if the execution successfully completed or using the `onFailure` method if
|
||||
it failed.
|
||||
|
||||
A typical listener for `BulkByScrollResponse` looks like:
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute-listener]
|
||||
--------------------------------------------------
|
||||
<1> Called when the execution is successfully completed. The response is
|
||||
provided as an argument and contains a list of individual results for each
|
||||
operation that was executed. Note that one or more operations might have
|
||||
failed while the others have been successfully executed.
|
||||
<2> Called when the whole `UpdateByQueryRequest` fails. In this case the raised
|
||||
exception is provided as an argument and no operation has been executed.
|
||||
|
||||
[[java-rest-high-document-update-by-query-execute-listener-response]]
|
||||
==== Update By Query Response
|
||||
|
||||
The returned `BulkByScrollResponse` contains information about the executed operations and
|
||||
allows to iterate over each result as follows:
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-response]
|
||||
--------------------------------------------------
|
||||
<1> Get total time taken
|
||||
<2> Check if the request timed out
|
||||
<3> Get total number of docs processed
|
||||
<4> Number of docs that were updated
|
||||
<5> Number of docs that were deleted
|
||||
<6> Number of batches that were executed
|
||||
<7> Number of skipped docs
|
||||
<8> Number of version conflicts
|
||||
<9> Number of times request had to retry bulk index operations
|
||||
<10> Number of times request had to retry search operations
|
||||
<11> The total time this request has throttled itself not including the current throttle time if it is currently sleeping
|
||||
<12> Remaining delay of any current throttle sleep or 0 if not sleeping
|
||||
<13> Failures during search phase
|
||||
<14> Failures during bulk index operation
|
|
@ -16,6 +16,7 @@ Multi-document APIs::
|
|||
* <<java-rest-high-document-bulk>>
|
||||
* <<java-rest-high-document-multi-get>>
|
||||
* <<java-rest-high-document-reindex>>
|
||||
* <<java-rest-high-document-update-by-query>>
|
||||
|
||||
include::document/index.asciidoc[]
|
||||
include::document/get.asciidoc[]
|
||||
|
@ -25,6 +26,7 @@ include::document/update.asciidoc[]
|
|||
include::document/bulk.asciidoc[]
|
||||
include::document/multi-get.asciidoc[]
|
||||
include::document/reindex.asciidoc[]
|
||||
include::document/update-by-query.asciidoc[]
|
||||
|
||||
== Search APIs
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
|
@ -63,7 +62,7 @@ public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<Upda
|
|||
* it to set its own defaults which differ from SearchRequest's
|
||||
* defaults. Then the parse can override them.
|
||||
*/
|
||||
UpdateByQueryRequest internal = new UpdateByQueryRequest(new SearchRequest());
|
||||
UpdateByQueryRequest internal = new UpdateByQueryRequest();
|
||||
|
||||
Map<String, Consumer<Object>> consumers = new HashMap<>();
|
||||
consumers.put("conflicts", o -> internal.setConflicts((String) o));
|
||||
|
|
|
@ -81,7 +81,7 @@ public class RoundTripTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testUpdateByQueryRequest() throws IOException {
|
||||
UpdateByQueryRequest update = new UpdateByQueryRequest(new SearchRequest());
|
||||
UpdateByQueryRequest update = new UpdateByQueryRequest();
|
||||
randomRequest(update);
|
||||
if (randomBoolean()) {
|
||||
update.setPipeline(randomAlphaOfLength(5));
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.index.reindex;
|
|||
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
public class UpdateByQueryMetadataTests
|
||||
|
@ -39,7 +38,7 @@ public class UpdateByQueryMetadataTests
|
|||
|
||||
@Override
|
||||
protected UpdateByQueryRequest request() {
|
||||
return new UpdateByQueryRequest(new SearchRequest());
|
||||
return new UpdateByQueryRequest();
|
||||
}
|
||||
|
||||
private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
||||
|
@ -50,7 +49,7 @@ public class UpdateByQueryWithScriptTests
|
|||
|
||||
@Override
|
||||
protected UpdateByQueryRequest request() {
|
||||
return new UpdateByQueryRequest(new SearchRequest());
|
||||
return new UpdateByQueryRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,8 +44,8 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
|
|||
public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScrollRequest<Self>> extends ActionRequest {
|
||||
|
||||
public static final int SIZE_ALL_MATCHES = -1;
|
||||
static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
|
||||
static final int DEFAULT_SCROLL_SIZE = 1000;
|
||||
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
|
||||
public static final int DEFAULT_SCROLL_SIZE = 1000;
|
||||
|
||||
public static final int AUTO_SLICES = 0;
|
||||
public static final String AUTO_SLICES_VALUE = "auto";
|
||||
|
|
|
@ -209,8 +209,8 @@ public class BulkByScrollTask extends CancellableTask {
|
|||
public static class StatusBuilder {
|
||||
private Integer sliceId = null;
|
||||
private Long total = null;
|
||||
private Long updated = null;
|
||||
private Long created = null;
|
||||
private long updated = 0; // Not present during deleteByQuery
|
||||
private long created = 0; // Not present during updateByQuery
|
||||
private Long deleted = null;
|
||||
private Integer batches = null;
|
||||
private Long versionConflicts = null;
|
||||
|
@ -221,7 +221,7 @@ public class BulkByScrollTask extends CancellableTask {
|
|||
private Float requestsPerSecond = null;
|
||||
private String reasonCancelled = null;
|
||||
private TimeValue throttledUntil = null;
|
||||
private List<StatusOrException> sliceStatuses = emptyList();
|
||||
private List<StatusOrException> sliceStatuses = new ArrayList<>();
|
||||
|
||||
public void setSliceId(Integer sliceId) {
|
||||
this.sliceId = sliceId;
|
||||
|
@ -295,10 +295,14 @@ public class BulkByScrollTask extends CancellableTask {
|
|||
|
||||
public void setSliceStatuses(List<StatusOrException> sliceStatuses) {
|
||||
if (sliceStatuses != null) {
|
||||
this.sliceStatuses = sliceStatuses;
|
||||
this.sliceStatuses.addAll(sliceStatuses);
|
||||
}
|
||||
}
|
||||
|
||||
public void addToSliceStatuses(StatusOrException statusOrException) {
|
||||
this.sliceStatuses.add(statusOrException);
|
||||
}
|
||||
|
||||
public Status buildStatus() {
|
||||
if (sliceStatuses.isEmpty()) {
|
||||
try {
|
||||
|
@ -613,37 +617,20 @@ public class BulkByScrollTask extends CancellableTask {
|
|||
Token token = parser.currentToken();
|
||||
String fieldName = parser.currentName();
|
||||
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation);
|
||||
Integer sliceId = null;
|
||||
Long total = null;
|
||||
Long updated = null;
|
||||
Long created = null;
|
||||
Long deleted = null;
|
||||
Integer batches = null;
|
||||
Long versionConflicts = null;
|
||||
Long noOps = null;
|
||||
Long bulkRetries = null;
|
||||
Long searchRetries = null;
|
||||
TimeValue throttled = null;
|
||||
Float requestsPerSecond = null;
|
||||
String reasonCancelled = null;
|
||||
TimeValue throttledUntil = null;
|
||||
List<StatusOrException> sliceStatuses = new ArrayList<>();
|
||||
StatusBuilder builder = new StatusBuilder();
|
||||
while ((token = parser.nextToken()) != Token.END_OBJECT) {
|
||||
if (token == Token.FIELD_NAME) {
|
||||
fieldName = parser.currentName();
|
||||
} else if (token == Token.START_OBJECT) {
|
||||
if (fieldName.equals(Status.RETRIES_FIELD)) {
|
||||
Tuple<Long, Long> retries =
|
||||
Status.RETRIES_PARSER.parse(parser, null);
|
||||
bulkRetries = retries.v1();
|
||||
searchRetries = retries.v2();
|
||||
builder.setRetries(Status.RETRIES_PARSER.parse(parser, null));
|
||||
} else {
|
||||
parser.skipChildren();
|
||||
}
|
||||
} else if (token == Token.START_ARRAY) {
|
||||
if (fieldName.equals(Status.SLICES_FIELD)) {
|
||||
while ((token = parser.nextToken()) != Token.END_ARRAY) {
|
||||
sliceStatuses.add(StatusOrException.fromXContent(parser));
|
||||
builder.addToSliceStatuses(StatusOrException.fromXContent(parser));
|
||||
}
|
||||
} else {
|
||||
parser.skipChildren();
|
||||
|
@ -651,57 +638,47 @@ public class BulkByScrollTask extends CancellableTask {
|
|||
} else { // else if it is a value
|
||||
switch (fieldName) {
|
||||
case Status.SLICE_ID_FIELD:
|
||||
sliceId = parser.intValue();
|
||||
builder.setSliceId(parser.intValue());
|
||||
break;
|
||||
case Status.TOTAL_FIELD:
|
||||
total = parser.longValue();
|
||||
builder.setTotal(parser.longValue());
|
||||
break;
|
||||
case Status.UPDATED_FIELD:
|
||||
updated = parser.longValue();
|
||||
builder.setUpdated(parser.longValue());
|
||||
break;
|
||||
case Status.CREATED_FIELD:
|
||||
created = parser.longValue();
|
||||
builder.setCreated(parser.longValue());
|
||||
break;
|
||||
case Status.DELETED_FIELD:
|
||||
deleted = parser.longValue();
|
||||
builder.setDeleted(parser.longValue());
|
||||
break;
|
||||
case Status.BATCHES_FIELD:
|
||||
batches = parser.intValue();
|
||||
builder.setBatches(parser.intValue());
|
||||
break;
|
||||
case Status.VERSION_CONFLICTS_FIELD:
|
||||
versionConflicts = parser.longValue();
|
||||
builder.setVersionConflicts(parser.longValue());
|
||||
break;
|
||||
case Status.NOOPS_FIELD:
|
||||
noOps = parser.longValue();
|
||||
builder.setNoops(parser.longValue());
|
||||
break;
|
||||
case Status.THROTTLED_RAW_FIELD:
|
||||
throttled = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS);
|
||||
builder.setThrottled(parser.longValue());
|
||||
break;
|
||||
case Status.REQUESTS_PER_SEC_FIELD:
|
||||
requestsPerSecond = parser.floatValue();
|
||||
requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond;
|
||||
builder.setRequestsPerSecond(parser.floatValue());
|
||||
break;
|
||||
case Status.CANCELED_FIELD:
|
||||
reasonCancelled = parser.text();
|
||||
builder.setReasonCancelled(parser.text());
|
||||
break;
|
||||
case Status.THROTTLED_UNTIL_RAW_FIELD:
|
||||
throttledUntil = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS);
|
||||
builder.setThrottledUntil(parser.longValue());
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (sliceStatuses.isEmpty()) {
|
||||
return
|
||||
new Status(
|
||||
sliceId, total, updated, created, deleted, batches, versionConflicts, noOps, bulkRetries,
|
||||
searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil
|
||||
);
|
||||
} else {
|
||||
return new Status(sliceStatuses, reasonCancelled);
|
||||
}
|
||||
|
||||
return builder.buildStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -838,15 +815,15 @@ public class BulkByScrollTask extends CancellableTask {
|
|||
);
|
||||
}
|
||||
|
||||
public boolean equalsWithoutSliceStatus(Object o) {
|
||||
public boolean equalsWithoutSliceStatus(Object o, boolean includeUpdated, boolean includeCreated) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Status other = (Status) o;
|
||||
return
|
||||
Objects.equals(sliceId, other.sliceId) &&
|
||||
total == other.total &&
|
||||
updated == other.updated &&
|
||||
created == other.created &&
|
||||
(!includeUpdated || updated == other.updated) &&
|
||||
(!includeCreated || created == other.created) &&
|
||||
deleted == other.deleted &&
|
||||
batches == other.batches &&
|
||||
versionConflicts == other.versionConflicts &&
|
||||
|
@ -861,7 +838,7 @@ public class BulkByScrollTask extends CancellableTask {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (equalsWithoutSliceStatus(o)) {
|
||||
if (equalsWithoutSliceStatus(o, true, true)) {
|
||||
return Objects.equals(sliceStatuses, ((Status) o).sliceStatuses);
|
||||
} else {
|
||||
return false;
|
||||
|
|
|
@ -24,6 +24,9 @@ import org.elasticsearch.action.search.SearchRequest;
|
|||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -34,16 +37,22 @@ import java.io.IOException;
|
|||
* representative set of subrequests. This is best-effort but better than {@linkplain ReindexRequest} because scripts can't change the
|
||||
* destination index and things.
|
||||
*/
|
||||
public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<UpdateByQueryRequest> implements IndicesRequest.Replaceable {
|
||||
public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<UpdateByQueryRequest>
|
||||
implements IndicesRequest.Replaceable, ToXContentObject {
|
||||
/**
|
||||
* Ingest pipeline to set on index requests made by this action.
|
||||
*/
|
||||
private String pipeline;
|
||||
|
||||
public UpdateByQueryRequest() {
|
||||
this(new SearchRequest());
|
||||
}
|
||||
|
||||
public UpdateByQueryRequest(SearchRequest search) {
|
||||
public UpdateByQueryRequest(String... indices) {
|
||||
this(new SearchRequest(indices));
|
||||
}
|
||||
|
||||
UpdateByQueryRequest(SearchRequest search) {
|
||||
this(search, true);
|
||||
}
|
||||
|
||||
|
@ -59,8 +68,81 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
|
|||
/**
|
||||
* Set the ingest pipeline to set on index requests made by this action.
|
||||
*/
|
||||
public void setPipeline(String pipeline) {
|
||||
public UpdateByQueryRequest setPipeline(String pipeline) {
|
||||
this.pipeline = pipeline;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the query for selective update
|
||||
*/
|
||||
public UpdateByQueryRequest setQuery(QueryBuilder query) {
|
||||
if (query != null) {
|
||||
getSearchRequest().source().query(query);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the document types for the update
|
||||
*/
|
||||
public UpdateByQueryRequest setDocTypes(String... types) {
|
||||
if (types != null) {
|
||||
getSearchRequest().types(types);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set routing limiting the process to the shards that match that routing value
|
||||
*/
|
||||
public UpdateByQueryRequest setRouting(String routing) {
|
||||
if (routing != null) {
|
||||
getSearchRequest().routing(routing);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The scroll size to control number of documents processed per batch
|
||||
*/
|
||||
public UpdateByQueryRequest setBatchSize(int size) {
|
||||
getSearchRequest().source().size(size);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the IndicesOptions for controlling unavailable indices
|
||||
*/
|
||||
public UpdateByQueryRequest setIndicesOptions(IndicesOptions indicesOptions) {
|
||||
getSearchRequest().indicesOptions(indicesOptions);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the batch size for this request
|
||||
*/
|
||||
public int getBatchSize() {
|
||||
return getSearchRequest().source().size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the routing value used for this request
|
||||
*/
|
||||
public String getRouting() {
|
||||
return getSearchRequest().routing();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the document types on which this request would be executed. Returns an empty array if all
|
||||
* types are to be processed.
|
||||
*/
|
||||
public String[] getDocTypes() {
|
||||
if (getSearchRequest().types() != null) {
|
||||
return getSearchRequest().types();
|
||||
} else {
|
||||
return new String[0];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -121,4 +203,16 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
|
|||
super.writeTo(out);
|
||||
out.writeOptionalString(pipeline);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (getScript() != null) {
|
||||
builder.field("script");
|
||||
getScript().toXContent(builder, params);
|
||||
}
|
||||
getSearchRequest().source().innerToXContent(builder, params);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,11 +24,15 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
@ -38,6 +42,9 @@ import static org.hamcrest.Matchers.containsString;
|
|||
|
||||
public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkByScrollResponse> {
|
||||
|
||||
private boolean includeUpdated;
|
||||
private boolean includeCreated;
|
||||
|
||||
public void testRountTrip() throws IOException {
|
||||
BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()),
|
||||
BulkByScrollTaskStatusTests.randomStatus(), randomIndexingFailures(), randomSearchFailures(), randomBoolean());
|
||||
|
@ -97,10 +104,11 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollResponse actual) {
|
||||
public static void assertEqualBulkResponse(BulkByScrollResponse expected, BulkByScrollResponse actual,
|
||||
boolean includeUpdated, boolean includeCreated) {
|
||||
assertEquals(expected.getTook(), actual.getTook());
|
||||
BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus());
|
||||
BulkByScrollTaskStatusTests
|
||||
.assertEqualStatus(expected.getStatus(), actual.getStatus(), includeUpdated, includeCreated);
|
||||
assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size());
|
||||
for (int i = 0; i < expected.getBulkFailures().size(); i++) {
|
||||
Failure expectedFailure = expected.getBulkFailures().get(i);
|
||||
|
@ -122,6 +130,11 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollResponse actual) {
|
||||
assertEqualBulkResponse(expected, actual, includeUpdated, includeCreated);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BulkByScrollResponse createTestInstance() {
|
||||
// failures are tested separately, so we can test XContent equivalence at least when we have no failures
|
||||
|
@ -141,4 +154,22 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase<BulkBySc
|
|||
protected boolean supportsUnknownFields() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ToXContent.Params getToXContentParams() {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
if (randomBoolean()) {
|
||||
includeUpdated = false;
|
||||
params.put(Status.INCLUDE_UPDATED, "false");
|
||||
} else {
|
||||
includeUpdated = true;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
includeCreated = false;
|
||||
params.put(Status.INCLUDE_CREATED, "false");
|
||||
} else {
|
||||
includeCreated = true;
|
||||
}
|
||||
return new ToXContent.MapParams(params);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,11 +55,14 @@ public class BulkByScrollTaskStatusOrExceptionTests extends AbstractXContentTest
|
|||
return StatusOrException.fromXContent(parser);
|
||||
}
|
||||
|
||||
public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual) {
|
||||
public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual,
|
||||
boolean includeUpdated, boolean includeCreated) {
|
||||
if (expected != null && actual != null) {
|
||||
assertNotSame(expected, actual);
|
||||
if (expected.getException() == null) {
|
||||
BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus());
|
||||
BulkByScrollTaskStatusTests
|
||||
// we test includeCreated params in the Status tests
|
||||
.assertEqualStatus(expected.getStatus(), actual.getStatus(), includeUpdated, includeCreated);
|
||||
} else {
|
||||
assertThat(
|
||||
actual.getException().getMessage(),
|
||||
|
@ -74,7 +77,7 @@ public class BulkByScrollTaskStatusOrExceptionTests extends AbstractXContentTest
|
|||
|
||||
@Override
|
||||
protected void assertEqualInstances(StatusOrException expected, StatusOrException actual) {
|
||||
assertEqualStatusOrException(expected, actual);
|
||||
assertEqualStatusOrException(expected, actual, true, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -33,7 +33,9 @@ import org.hamcrest.Matchers;
|
|||
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.IntStream;
|
||||
|
@ -44,6 +46,10 @@ import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase<BulkByScrollTask.Status> {
|
||||
|
||||
private boolean includeUpdated;
|
||||
private boolean includeCreated;
|
||||
|
||||
public void testBulkByTaskStatus() throws IOException {
|
||||
BulkByScrollTask.Status status = randomStatus();
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
|
@ -144,21 +150,21 @@ public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase<BulkBy
|
|||
);
|
||||
}
|
||||
|
||||
public static void assertEqualStatus(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
|
||||
public static void assertEqualStatus(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual,
|
||||
boolean includeUpdated, boolean includeCreated) {
|
||||
assertNotSame(expected, actual);
|
||||
assertTrue(expected.equalsWithoutSliceStatus(actual));
|
||||
assertTrue(expected.equalsWithoutSliceStatus(actual, includeUpdated, includeCreated));
|
||||
assertThat(expected.getSliceStatuses().size(), equalTo(actual.getSliceStatuses().size()));
|
||||
for (int i = 0; i< expected.getSliceStatuses().size(); i++) {
|
||||
BulkByScrollTaskStatusOrExceptionTests.assertEqualStatusOrException(
|
||||
expected.getSliceStatuses().get(i),
|
||||
actual.getSliceStatuses().get(i)
|
||||
expected.getSliceStatuses().get(i), actual.getSliceStatuses().get(i), includeUpdated, includeCreated
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertEqualInstances(BulkByScrollTask.Status first, BulkByScrollTask.Status second) {
|
||||
assertEqualStatus(first, second);
|
||||
assertEqualStatus(first, second, includeUpdated, includeCreated);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -193,4 +199,22 @@ public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase<BulkBy
|
|||
getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance,
|
||||
this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ToXContent.Params getToXContentParams() {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
if (randomBoolean()) {
|
||||
includeUpdated = false;
|
||||
params.put(Status.INCLUDE_UPDATED, "false");
|
||||
} else {
|
||||
includeUpdated = true;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
includeCreated = false;
|
||||
params.put(Status.INCLUDE_CREATED, "false");
|
||||
} else {
|
||||
includeCreated = true;
|
||||
}
|
||||
return new ToXContent.MapParams(params);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
|
@ -32,11 +31,11 @@ public class UpdateByQueryRequestTests extends AbstractBulkByScrollRequestTestCa
|
|||
indices[i] = randomSimpleString(random(), 1, 30);
|
||||
}
|
||||
|
||||
SearchRequest searchRequest = new SearchRequest(indices);
|
||||
IndicesOptions indicesOptions = IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean());
|
||||
searchRequest.indicesOptions(indicesOptions);
|
||||
|
||||
UpdateByQueryRequest request = new UpdateByQueryRequest(searchRequest);
|
||||
UpdateByQueryRequest request = new UpdateByQueryRequest();
|
||||
request.indices(indices);
|
||||
request.setIndicesOptions(indicesOptions);
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
assertEquals(indices[i], request.indices()[i]);
|
||||
}
|
||||
|
@ -60,7 +59,7 @@ public class UpdateByQueryRequestTests extends AbstractBulkByScrollRequestTestCa
|
|||
|
||||
@Override
|
||||
protected UpdateByQueryRequest newRequest() {
|
||||
return new UpdateByQueryRequest(new SearchRequest(randomAlphaOfLength(5)));
|
||||
return new UpdateByQueryRequest(randomAlphaOfLength(5));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue