Implement ctx.op = "delete" on _update_by_query and _reindex

closes #18043
This commit is contained in:
Tanguy Leroux 2016-05-27 16:34:53 +02:00
parent 4ca04d6f6c
commit a1172d816c
11 changed files with 468 additions and 31 deletions

View File

@ -30,6 +30,7 @@ That will return something like this:
"timed_out": false,
"created": 120,
"updated": 0,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
@ -244,6 +245,24 @@ POST _reindex
// CONSOLE
// TEST[setup:twitter]
Just as in `_update_by_query`, you can set `ctx.op` to change the
operation that is executed on the destination index:
`noop`::
Set `ctx.op = "noop"` if your script decides that the document doesn't have
to be indexed in the destination index. This no operation will be reported
in the `noop` counter in the <<docs-reindex-response-body, response body>>.
`delete`::
Set `ctx.op = "delete"` if your script decides that the document must be
deleted from the destination index. The deletion will be reported in the
`deleted` counter in the <<docs-reindex-response-body, response body>>.
Setting `ctx.op` to anything else is an error. Setting any
other field in `ctx` is an error.
Think of the possibilities! Just be careful! With great power.... You can
change:
@ -377,6 +396,7 @@ starting the next set. This is "bursty" instead of "smooth". The default is
`unlimited` which is also the only non-number value that it accepts.
[float]
[[docs-reindex-response-body]]
=== Response body
The JSON response looks like this:

View File

@ -23,6 +23,7 @@ That will return something like this:
"took" : 147,
"timed_out": false,
"updated": 120,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
@ -115,11 +116,24 @@ POST twitter/_update_by_query
// CONSOLE
// TEST[setup:twitter]
Just as in <<docs-update,Update API>> you can set `ctx.op = "noop"` if
your script decides that it doesn't have to make any changes. That will cause
`_update_by_query` to omit that document from its updates. Setting `ctx.op` to
anything else is an error. If you want to delete by a query you can use the
<<docs-delete-by-query,Delete By Query API>> instead. Setting any
Just as in <<docs-update,Update API>> you can set `ctx.op` to change the
operation that is executed:
`noop`::
Set `ctx.op = "noop"` if your script decides that it doesn't have to make any
changes. That will cause `_update_by_query` to omit that document from its updates.
This no operation will be reported in the `noop` counter in the
<<docs-update-by-query-response-body, response body>>.
`delete`::
Set `ctx.op = "delete"` if your script decides that the document must be
deleted. The deletion will be reported in the `deleted` counter in the
<<docs-update-by-query-response-body, response body>>.
Setting `ctx.op` to anything else is an error. Setting any
other field in `ctx` is an error.
Note that we stopped specifying `conflicts=proceed`. In this case we want a
@ -212,6 +226,7 @@ starting the next set. This is "bursty" instead of "smooth". The default is
`unlimited` which is also the only non-number value that it accepts.
[float]
[[docs-update-by-query-response-body]]
=== Response body
The JSON response looks like this:

View File

@ -48,7 +48,9 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
@ -173,18 +175,30 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
void setIndex(String index);
String getIndex();
void setType(String type);
String getType();
void setId(String id);
String getId();
void setVersion(long version);
long getVersion();
void setVersionType(VersionType versionType);
void setParent(String parent);
String getParent();
void setRouting(String routing);
String getRouting();
void setTimestamp(String timestamp);
void setTtl(Long ttl);
@ -212,21 +226,41 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
request.index(index);
}
@Override
public String getIndex() {
return request.index();
}
@Override
public void setType(String type) {
request.type(type);
}
@Override
public String getType() {
return request.type();
}
@Override
public void setId(String id) {
request.id(id);
}
@Override
public String getId() {
return request.id();
}
@Override
public void setVersion(long version) {
request.version(version);
}
@Override
public long getVersion() {
return request.version();
}
@Override
public void setVersionType(VersionType versionType) {
request.versionType(versionType);
@ -237,11 +271,21 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
request.parent(parent);
}
@Override
public String getParent() {
return request.parent();
}
@Override
public void setRouting(String routing) {
request.routing(routing);
}
@Override
public String getRouting() {
return request.routing();
}
@Override
public void setTimestamp(String timestamp) {
request.timestamp(timestamp);
@ -295,21 +339,41 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
request.index(index);
}
@Override
public String getIndex() {
return request.index();
}
@Override
public void setType(String type) {
request.type(type);
}
@Override
public String getType() {
return request.type();
}
@Override
public void setId(String id) {
request.id(id);
}
@Override
public String getId() {
return request.id();
}
@Override
public void setVersion(long version) {
request.version(version);
}
@Override
public long getVersion() {
return request.version();
}
@Override
public void setVersionType(VersionType versionType) {
request.versionType(versionType);
@ -320,11 +384,21 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
request.parent(parent);
}
@Override
public String getParent() {
return request.parent();
}
@Override
public void setRouting(String routing) {
request.routing(routing);
}
@Override
public String getRouting() {
return request.routing();
}
@Override
public void setTimestamp(String timestamp) {
throw new UnsupportedOperationException("unable to set [timestamp] on action request [" + request.getClass() + "]");
@ -409,21 +483,17 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
Long oldTTL = fieldValue(doc, TTLFieldMapper.NAME);
context.put(TTLFieldMapper.NAME, oldTTL);
context.put(SourceFieldMapper.NAME, request.getSource());
context.put("op", "update");
OpType oldOpType = OpType.INDEX;
context.put("op", oldOpType.toString());
executable.setNextVar("ctx", context);
executable.run();
Map<String, Object> resultCtx = (Map<String, Object>) executable.unwrap(context);
String newOp = (String) resultCtx.remove("op");
if (newOp == null) {
throw new IllegalArgumentException("Script cleared op!");
}
if ("noop".equals(newOp)) {
task.countNoop();
return null;
}
if (false == "update".equals(newOp)) {
throw new IllegalArgumentException("Invalid op [" + newOp + ']');
throw new IllegalArgumentException("Script cleared operation type");
}
/*
@ -468,12 +538,35 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
if (false == Objects.equals(oldTTL, newValue)) {
scriptChangedTTL(request, newValue);
}
OpType newOpType = OpType.fromString(newOp);
if (newOpType != oldOpType) {
return scriptChangedOpType(request, oldOpType, newOpType);
}
if (false == context.isEmpty()) {
throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", context.keySet()) + ']');
}
return request;
}
protected RequestWrapper<?> scriptChangedOpType(RequestWrapper<?> request, OpType oldOpType, OpType newOpType) {
switch (newOpType) {
case NOOP:
task.countNoop();
return null;
case DELETE:
RequestWrapper<DeleteRequest> delete = wrap(new DeleteRequest(request.getIndex(), request.getType(), request.getId()));
delete.setVersion(request.getVersion());
delete.setVersionType(VersionType.INTERNAL);
delete.setParent(request.getParent());
delete.setRouting(request.getRouting());
return delete;
default:
throw new IllegalArgumentException("Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]");
}
}
protected abstract void scriptChangedIndex(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedType(RequestWrapper<?> request, Object to);
@ -489,5 +582,39 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
protected abstract void scriptChangedTimestamp(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedTTL(RequestWrapper<?> request, Object to);
}
public enum OpType {
NOOP("noop"),
INDEX("index"),
DELETE("delete");
private final String id;
OpType(String id) {
this.id = id;
}
public static OpType fromString(String opType) {
String lowerOpType = opType.toLowerCase(Locale.ROOT);
switch (lowerOpType) {
case "noop":
return OpType.NOOP;
case "index":
return OpType.INDEX;
case "delete":
return OpType.DELETE;
default:
throw new IllegalArgumentException("Operation type [" + lowerOpType + "] not allowed, only " +
Arrays.toString(values()) + " are allowed");
}
}
@Override
public String toString() {
return id.toLowerCase(Locale.ROOT);
}
}
}

View File

@ -64,7 +64,7 @@ public abstract class AbstractBaseReindexRestHandler<
}
protected void handleRequest(RestRequest request, RestChannel channel,
boolean includeCreated, boolean includeUpdated, boolean includeDeleted) throws IOException {
boolean includeCreated, boolean includeUpdated) throws IOException {
// Build the internal request
Request internal = setCommonOptions(request, buildRequest(request));
@ -74,7 +74,6 @@ public abstract class AbstractBaseReindexRestHandler<
Map<String, String> params = new HashMap<>();
params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(includeCreated));
params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(includeUpdated));
params.put(BulkByScrollTask.Status.INCLUDE_DELETED, Boolean.toString(includeDeleted));
action.execute(internal, new BulkIndexByScrollResponseContentListener<>(channel, params));
return;

View File

@ -126,12 +126,6 @@ public class BulkByScrollTask extends CancellableTask {
*/
public static final String INCLUDE_UPDATED = "include_updated";
/**
* XContent param name to indicate if "deleted" count must be included
* in the response.
*/
public static final String INCLUDE_DELETED = "include_deleted";
private final long total;
private final long updated;
private final long created;
@ -213,9 +207,7 @@ public class BulkByScrollTask extends CancellableTask {
if (params.paramAsBoolean(INCLUDE_CREATED, true)) {
builder.field("created", created);
}
if (params.paramAsBoolean(INCLUDE_DELETED, true)) {
builder.field("deleted", deleted);
}
builder.field("deleted", deleted);
builder.field("batches", batches);
builder.field("version_conflicts", versionConflicts);
builder.field("noops", noops);

View File

@ -55,7 +55,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<Dele
if (false == request.hasContent()) {
throw new ElasticsearchException("_delete_by_query requires a request body");
}
handleRequest(request, channel, false, false, true);
handleRequest(request, channel, false, false);
}
@Override

View File

@ -115,7 +115,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
if (false == request.hasContent()) {
throw new ElasticsearchException("_reindex requires a request body");
}
handleRequest(request, channel, true, true, false);
handleRequest(request, channel, true, true);
}
@Override

View File

@ -52,7 +52,7 @@ public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<Upda
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
handleRequest(request, channel, false, true, false);
handleRequest(request, channel, false, true);
}
@Override

View File

@ -19,9 +19,13 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.OpType;
import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.RequestWrapper;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
@ -56,7 +60,8 @@ public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
scriptService = mock(ScriptService.class);
}
protected IndexRequest applyScript(Consumer<Map<String, Object>> scriptBody) {
@SuppressWarnings("unchecked")
protected <T extends ActionRequest<?>> T applyScript(Consumer<Map<String, Object>> scriptBody) {
IndexRequest index = new IndexRequest("index", "type", "1").source(singletonMap("foo", "bar"));
Map<String, SearchHitField> fields = new HashMap<>();
InternalSearchHit doc = new InternalSearchHit(0, "id", new Text("type"), fields);
@ -66,8 +71,8 @@ public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
when(scriptService.executable(any(CompiledScript.class), Matchers.<Map<String, Object>>any()))
.thenReturn(executableScript);
AbstractAsyncBulkIndexByScrollAction<Request> action = action(scriptService, request().setScript(EMPTY_SCRIPT));
action.buildScriptApplier().apply(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc);
return index;
RequestWrapper<?> result = action.buildScriptApplier().apply(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc);
return (result != null) ? (T) result.self() : null;
}
public void testScriptAddingJunkToCtxIsError() {
@ -88,5 +93,24 @@ public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
assertEquals("cat", index.sourceAsMap().get("bar"));
}
public void testSetOpTypeNoop() throws Exception {
assertThat(task.getStatus().getNoops(), equalTo(0L));
assertNull(applyScript((Map<String, Object> ctx) -> ctx.put("op", OpType.NOOP.toString())));
assertThat(task.getStatus().getNoops(), equalTo(1L));
}
public void testSetOpTypeDelete() throws Exception {
DeleteRequest delete = applyScript((Map<String, Object> ctx) -> ctx.put("op", OpType.DELETE.toString()));
assertThat(delete.index(), equalTo("index"));
assertThat(delete.type(), equalTo("type"));
assertThat(delete.id(), equalTo("1"));
}
public void testSetOpTypeUnknown() throws Exception {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> applyScript((Map<String, Object> ctx) -> ctx.put("op", "unknown")));
assertThat(e.getMessage(), equalTo("Operation type [unknown] not allowed, only [noop, index, delete] are allowed"));
}
protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action(ScriptService scriptService, Request request);
}

View File

@ -284,6 +284,11 @@
user: notfoo
- match: { hits.total: 0 }
- do:
count:
index: new_twitter
- match: {count: 1}
---
"Noop all docs":
- do:
@ -313,6 +318,11 @@
- match: {updated: 0}
- match: {noops: 2}
- do:
indices.exists:
index: new_twitter
- is_false: ''
---
"Set version to null to force an update":
- do:
@ -443,3 +453,85 @@
match:
user: another
- match: { hits.total: 1 }
---
"Reindex all docs with one doc deletion":
# Source index
- do:
index:
index: index1
type: type1
id: 1
body: { "lang": "en", "id": 123 }
- do:
index:
index: index1
type: type1
id: 2
body: { "lang": "en", "id": 456 }
- do:
index:
index: index1
type: type1
id: 3
body: { "lang": "fr", "id": 789 }
# Destination index
- do:
index:
index: index2
type: type2
id: fr_789
body: { "lang": "fr", "id": 789 }
- do:
index:
index: index2
type: type2
id: en_123
body: { "lang": "en", "id": 123 }
- do:
indices.refresh: {}
# Reindex all documents from "index1" into "index2", changing their type
# to "type2" and their id to the concatened lang+id fields,
# trashing all non-english pre existing ones
- do:
reindex:
refresh: true
body:
source:
index: index1
dest:
index: index2
type: type2
script:
inline: "ctx._id = ctx._source.lang + '_' + ctx._source.id;
if (ctx._source.lang != \"en\" ) {ctx.op = 'delete'}"
- match: {created: 1}
- match: {noops: 0}
- match: {updated: 1}
- match: {deleted: 1}
- do:
mget:
body:
docs:
- { _index: index2, _type: type2, _id: en_123}
- { _index: index2, _type: type2, _id: en_456}
- { _index: index2, _type: type2, _id: fr_789}
- is_true: docs.0.found
- match: { docs.0._index: index2 }
- match: { docs.0._type: type2 }
- match: { docs.0._id: en_123 }
- match: { docs.0._version: 2 }
- is_true: docs.1.found
- match: { docs.1._index: index2 }
- match: { docs.1._type: type2 }
- match: { docs.1._id: en_456 }
- match: { docs.1._version: 1 }
- is_false: docs.2.found
- match: { docs.2._index: index2 }
- match: { docs.2._type: type2 }
- match: { docs.2._id: fr_789 }

View File

@ -138,3 +138,171 @@
body:
script:
inline: ctx._id = "stuff"
---
"Update all docs with one doc deletion":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "level": 9, "last_updated": "2016-01-01T12:10:30Z" }
- do:
index:
index: twitter
type: tweet
id: 2
body: { "level": 10, "last_updated": "2016-01-01T12:10:30Z" }
- do:
index:
index: twitter
type: tweet
id: 3
body: { "level": 11, "last_updated": "2016-01-01T12:10:30Z" }
- do:
index:
index: twitter
type: tweet
id: 4
body: { "level": 12, "last_updated": "2016-01-01T12:10:30Z" }
- do:
indices.refresh: {}
- do:
update_by_query:
refresh: true
index: twitter
body:
script:
inline: if (ctx._source.level != 11) {ctx._source.last_updated = "2016-01-02T00:00:00Z"} else {ctx.op = "delete"}
- match: {updated: 3}
- match: {deleted: 1}
- match: {noops: 0}
- do:
search:
index: twitter
body:
query:
match:
last_updated: "2016-01-02T00:00:00Z"
- match: { hits.total: 3 }
- do:
search:
index: twitter
body:
query:
term:
level: 11
- match: { hits.total: 0 }
- do:
count:
index: twitter
- match: {count: 3}
---
"Update all docs with one deletion and one noop":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "level": 9, "last_updated": "2016-01-01T12:10:30Z" }
- do:
index:
index: twitter
type: tweet
id: 2
body: { "level": 10, "last_updated": "2016-01-01T12:10:30Z" }
- do:
index:
index: twitter
type: tweet
id: 3
body: { "level": 11, "last_updated": "2016-01-01T12:10:30Z" }
- do:
index:
index: twitter
type: tweet
id: 4
body: { "level": 12, "last_updated": "2016-01-01T12:10:30Z" }
- do:
indices.refresh: {}
- do:
update_by_query:
refresh: true
index: twitter
body:
script:
inline: "switch (ctx._source.level % 3) {
case 0:
ctx._source.last_updated = \"2016-01-02T00:00:00Z\";
break;
case 1:
ctx.op = \"noop\";
break;
case 2:
ctx.op = \"delete\";
break;
}"
- match: {updated: 2}
- match: {deleted: 1}
- match: {noops: 1}
- do:
search:
index: twitter
body:
query:
match:
last_updated: "2016-01-02T00:00:00Z"
- match: { hits.total: 2 }
- do:
search:
index: twitter
body:
query:
match:
last_updated: "2016-01-01T12:10:30Z"
- match: { hits.total: 1 }
- do:
search:
index: twitter
body:
query:
term:
level: 11
- match: { hits.total: 0 }
---
"Set unsupported operation type":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: twitter
type: tweet
id: 2
body: { "user": "foo" }
- do:
indices.refresh: {}
- do:
catch: request
update_by_query:
refresh: true
index: twitter
body:
script:
inline: if (ctx._source.user == "kimchy") {ctx.op = "update"} else {ctx.op = "junk"}
- match: { error.reason: 'Operation type [junk] not allowed, only [noop, index, delete] are allowed' }