Refactor UpdateHelper into unit-testable pieces
This starts breaking up the `UpdateHelper.prepare` method so that each piece can be individually unit tested. No actual functionality has changed. Note however, that I did add a TODO about `ctx.op` leniency, which I'd love to remove as a separate PR if desired.
This commit is contained in:
parent
0ec30eb8e0
commit
1907c46689
|
@ -177,7 +177,6 @@
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]TransportShardMultiTermsVectorAction.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]TransportShardMultiTermsVectorAction.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]TransportTermVectorsAction.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]TransportTermVectorsAction.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]TransportUpdateAction.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]TransportUpdateAction.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]UpdateHelper.java" checks="LineLength" />
|
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]UpdateRequest.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]UpdateRequest.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]UpdateRequestBuilder.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]update[/\\]UpdateRequestBuilder.java" checks="LineLength" />
|
||||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]bootstrap[/\\]Bootstrap.java" checks="LineLength" />
|
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]bootstrap[/\\]Bootstrap.java" checks="LineLength" />
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.update;
|
package org.elasticsearch.action.update;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.DocWriteResponse;
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
|
@ -79,11 +80,56 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepares an update request by converting it into an index or delete request or an update response (no action).
|
* Prepares an update request by converting it into an index or delete request or an update response (no action, in the event of a
|
||||||
|
* noop).
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) {
|
protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) {
|
||||||
if (!getResult.isExists()) {
|
if (getResult.isExists() == false) {
|
||||||
|
// If the document didn't exist, execute the update request as an upsert
|
||||||
|
return prepareUpsert(shardId, request, getResult, nowInMillis);
|
||||||
|
} else if (getResult.internalSourceRef() == null) {
|
||||||
|
// no source, we can't do anything, throw a failure...
|
||||||
|
throw new DocumentSourceMissingException(shardId, request.type(), request.id());
|
||||||
|
} else if (request.script() == null && request.doc() != null) {
|
||||||
|
// The request has no script, it is a new doc that should be merged with the old document
|
||||||
|
return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop());
|
||||||
|
} else {
|
||||||
|
// The request has a script (or empty script), execute the script and prepare a new index request
|
||||||
|
return prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a scripted upsert, where there is an existing upsert document and a script to be executed. The script is executed and a new
|
||||||
|
* Tuple of operation and updated {@code _source} is returned.
|
||||||
|
*/
|
||||||
|
Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(IndexRequest upsert, Script script, LongSupplier nowInMillis) {
|
||||||
|
Map<String, Object> upsertDoc = upsert.sourceAsMap();
|
||||||
|
Map<String, Object> ctx = new HashMap<>(3);
|
||||||
|
// Tell the script that this is a create and not an update
|
||||||
|
ctx.put(ContextFields.OP, UpdateOpType.CREATE.toString());
|
||||||
|
ctx.put(ContextFields.SOURCE, upsertDoc);
|
||||||
|
ctx.put(ContextFields.NOW, nowInMillis.getAsLong());
|
||||||
|
ctx = executeScript(script, ctx);
|
||||||
|
|
||||||
|
UpdateOpType operation = UpdateOpType.lenientFromString((String) ctx.get(ContextFields.OP), logger, script.getIdOrCode());
|
||||||
|
Map newSource = (Map) ctx.get(ContextFields.SOURCE);
|
||||||
|
|
||||||
|
if (operation != UpdateOpType.CREATE && operation != UpdateOpType.NONE) {
|
||||||
|
// Only valid options for an upsert script are "create" (the default) or "none", meaning abort upsert
|
||||||
|
logger.warn("Invalid upsert operation [{}] for script [{}], doing nothing...", operation, script.getIdOrCode());
|
||||||
|
operation = UpdateOpType.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Tuple<>(operation, newSource);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare the request for upsert, executing the upsert script if present, and returning a {@code Result} containing a new
|
||||||
|
* {@code IndexRequest} to be executed on the primary and replicas.
|
||||||
|
*/
|
||||||
|
Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) {
|
||||||
if (request.upsertRequest() == null && !request.docAsUpsert()) {
|
if (request.upsertRequest() == null && !request.docAsUpsert()) {
|
||||||
throw new DocumentMissingException(shardId, request.type(), request.id());
|
throw new DocumentMissingException(shardId, request.type(), request.id());
|
||||||
}
|
}
|
||||||
|
@ -91,123 +137,164 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
if (request.scriptedUpsert() && request.script() != null) {
|
if (request.scriptedUpsert() && request.script() != null) {
|
||||||
// Run the script to perform the create logic
|
// Run the script to perform the create logic
|
||||||
IndexRequest upsert = request.upsertRequest();
|
IndexRequest upsert = request.upsertRequest();
|
||||||
Map<String, Object> upsertDoc = upsert.sourceAsMap();
|
Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(upsert, request.script, nowInMillis);
|
||||||
Map<String, Object> ctx = new HashMap<>(2);
|
switch (upsertResult.v1()) {
|
||||||
// Tell the script that this is a create and not an update
|
case CREATE:
|
||||||
ctx.put("op", "create");
|
// Update the index request with the new "_source"
|
||||||
ctx.put("_source", upsertDoc);
|
indexRequest.source(upsertResult.v2());
|
||||||
ctx.put("_now", nowInMillis.getAsLong());
|
break;
|
||||||
ctx = executeScript(request.script, ctx);
|
case NONE:
|
||||||
|
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
|
||||||
//Allow the script to abort the create by setting "op" to "none"
|
getResult.getVersion(), DocWriteResponse.Result.NOOP);
|
||||||
String scriptOpChoice = (String) ctx.get("op");
|
update.setGetResult(getResult);
|
||||||
|
return new Result(update, DocWriteResponse.Result.NOOP, upsertResult.v2(), XContentType.JSON);
|
||||||
// Only valid options for an upsert script are "create"
|
default:
|
||||||
// (the default) or "none", meaning abort upsert
|
// It's fine to throw an exception here, the leniency is handled/logged by `executeScriptedUpsert`
|
||||||
if (!"create".equals(scriptOpChoice)) {
|
throw new IllegalArgumentException("unknown upsert operation, got: " + upsertResult.v1());
|
||||||
if (!"none".equals(scriptOpChoice)) {
|
|
||||||
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice,
|
|
||||||
request.script.getIdOrCode());
|
|
||||||
}
|
|
||||||
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
|
|
||||||
getResult.getVersion(), DocWriteResponse.Result.NOOP);
|
|
||||||
update.setGetResult(getResult);
|
|
||||||
return new Result(update, DocWriteResponse.Result.NOOP, upsertDoc, XContentType.JSON);
|
|
||||||
}
|
}
|
||||||
indexRequest.source((Map) ctx.get("_source"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
indexRequest.index(request.index()).type(request.type()).id(request.id())
|
indexRequest.index(request.index())
|
||||||
|
.type(request.type()).id(request.id()).setRefreshPolicy(request.getRefreshPolicy()).routing(request.routing())
|
||||||
|
.parent(request.parent()).timeout(request.timeout()).waitForActiveShards(request.waitForActiveShards())
|
||||||
// it has to be a "create!"
|
// it has to be a "create!"
|
||||||
.create(true)
|
.create(true);
|
||||||
.setRefreshPolicy(request.getRefreshPolicy())
|
|
||||||
.routing(request.routing())
|
|
||||||
.parent(request.parent())
|
|
||||||
.timeout(request.timeout())
|
|
||||||
.waitForActiveShards(request.waitForActiveShards());
|
|
||||||
if (request.versionType() != VersionType.INTERNAL) {
|
if (request.versionType() != VersionType.INTERNAL) {
|
||||||
// in all but the internal versioning mode, we want to create the new document using the given version.
|
// in all but the internal versioning mode, we want to create the new document using the given version.
|
||||||
indexRequest.version(request.version()).versionType(request.versionType());
|
indexRequest.version(request.version()).versionType(request.versionType());
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
|
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
long updateVersion = getResult.getVersion();
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the version to use for the update request, using either the existing version if internal versioning is used, or the get
|
||||||
|
* result document's version if the version type is "FORCE".
|
||||||
|
*/
|
||||||
|
static long calculateUpdateVersion(UpdateRequest request, GetResult getResult) {
|
||||||
if (request.versionType() != VersionType.INTERNAL) {
|
if (request.versionType() != VersionType.INTERNAL) {
|
||||||
assert request.versionType() == VersionType.FORCE;
|
assert request.versionType() == VersionType.FORCE;
|
||||||
updateVersion = request.version(); // remember, match_any is excluded by the conflict test
|
return request.version(); // remember, match_any is excluded by the conflict test
|
||||||
|
} else {
|
||||||
|
return getResult.getVersion();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (getResult.internalSourceRef() == null) {
|
/**
|
||||||
// no source, we can't do nothing, through a failure...
|
* Calculate a routing value to be used, either the included index request's routing, or retrieved document's routing when defined.
|
||||||
throw new DocumentSourceMissingException(shardId, request.type(), request.id());
|
*/
|
||||||
|
@Nullable
|
||||||
|
static String calculateRouting(GetResult getResult, @Nullable IndexRequest updateIndexRequest) {
|
||||||
|
if (updateIndexRequest != null && updateIndexRequest.routing() != null) {
|
||||||
|
return updateIndexRequest.routing();
|
||||||
|
} else if (getResult.getFields().containsKey(RoutingFieldMapper.NAME)) {
|
||||||
|
return getResult.field(RoutingFieldMapper.NAME).getValue().toString();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
/**
|
||||||
String operation = null;
|
* Calculate a parent value to be used, either the included index request's parent, or retrieved document's parent when defined.
|
||||||
final Map<String, Object> updatedSourceAsMap;
|
*/
|
||||||
|
@Nullable
|
||||||
|
static String calculateParent(GetResult getResult, @Nullable IndexRequest updateIndexRequest) {
|
||||||
|
if (updateIndexRequest != null && updateIndexRequest.parent() != null) {
|
||||||
|
return updateIndexRequest.parent();
|
||||||
|
} else if (getResult.getFields().containsKey(ParentFieldMapper.NAME)) {
|
||||||
|
return getResult.field(ParentFieldMapper.NAME).getValue().toString();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare the request for merging the existing document with a new one, can optionally detect a noop change. Returns a {@code Result}
|
||||||
|
* containing a new {@code IndexRequest} to be executed on the primary and replicas.
|
||||||
|
*/
|
||||||
|
Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
|
||||||
|
final long updateVersion = calculateUpdateVersion(request, getResult);
|
||||||
|
final IndexRequest currentRequest = request.doc();
|
||||||
|
final String routing = calculateRouting(getResult, currentRequest);
|
||||||
|
final String parent = calculateParent(getResult, currentRequest);
|
||||||
|
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
||||||
final XContentType updateSourceContentType = sourceAndContent.v1();
|
final XContentType updateSourceContentType = sourceAndContent.v1();
|
||||||
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
|
final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();
|
||||||
String parent = getResult.getFields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).getValue().toString() : null;
|
|
||||||
|
|
||||||
if (request.script() == null && request.doc() != null) {
|
final boolean noop = !XContentHelper.update(updatedSourceAsMap, currentRequest.sourceAsMap(), detectNoop);
|
||||||
IndexRequest indexRequest = request.doc();
|
|
||||||
updatedSourceAsMap = sourceAndContent.v2();
|
// We can only actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle cases
|
||||||
if (indexRequest.routing() != null) {
|
// where users repopulating multi-fields or adding synonyms, etc.
|
||||||
routing = indexRequest.routing();
|
if (detectNoop && noop) {
|
||||||
}
|
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
|
||||||
if (indexRequest.parent() != null) {
|
getResult.getVersion(), DocWriteResponse.Result.NOOP);
|
||||||
parent = indexRequest.parent();
|
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap,
|
||||||
}
|
updateSourceContentType, getResult.internalSourceRef()));
|
||||||
boolean noop = !XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap(), request.detectNoop());
|
return new Result(update, DocWriteResponse.Result.NOOP, updatedSourceAsMap, updateSourceContentType);
|
||||||
// noop could still be true even if detectNoop isn't because update detects empty maps as noops. BUT we can only
|
|
||||||
// actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle
|
|
||||||
// cases where users repopulating multi-fields or adding synonyms, etc.
|
|
||||||
if (request.detectNoop() && noop) {
|
|
||||||
operation = "none";
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
Map<String, Object> ctx = new HashMap<>(16);
|
final IndexRequest finalIndexRequest = Requests.indexRequest(request.index())
|
||||||
ctx.put("_index", getResult.getIndex());
|
.type(request.type()).id(request.id()).routing(routing).parent(parent)
|
||||||
ctx.put("_type", getResult.getType());
|
.source(updatedSourceAsMap, updateSourceContentType).version(updateVersion).versionType(request.versionType())
|
||||||
ctx.put("_id", getResult.getId());
|
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
|
||||||
ctx.put("_version", getResult.getVersion());
|
.setRefreshPolicy(request.getRefreshPolicy());
|
||||||
ctx.put("_routing", routing);
|
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
|
||||||
ctx.put("_parent", parent);
|
|
||||||
ctx.put("_source", sourceAndContent.v2());
|
|
||||||
ctx.put("_now", nowInMillis.getAsLong());
|
|
||||||
|
|
||||||
ctx = executeScript(request.script, ctx);
|
|
||||||
|
|
||||||
operation = (String) ctx.get("op");
|
|
||||||
|
|
||||||
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare the request for updating an existing document using a script. Executes the script and returns a {@code Result} containing
|
||||||
|
* either a new {@code IndexRequest} or {@code DeleteRequest} (depending on the script's returned "op" value) to be executed on the
|
||||||
|
* primary and replicas.
|
||||||
|
*/
|
||||||
|
Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetResult getResult, LongSupplier nowInMillis) {
|
||||||
|
final long updateVersion = calculateUpdateVersion(request, getResult);
|
||||||
|
final IndexRequest currentRequest = request.doc();
|
||||||
|
final String routing = calculateRouting(getResult, currentRequest);
|
||||||
|
final String parent = calculateParent(getResult, currentRequest);
|
||||||
|
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
||||||
|
final XContentType updateSourceContentType = sourceAndContent.v1();
|
||||||
|
final Map<String, Object> sourceAsMap = sourceAndContent.v2();
|
||||||
|
|
||||||
|
Map<String, Object> ctx = new HashMap<>(16);
|
||||||
|
ctx.put(ContextFields.OP, UpdateOpType.INDEX.toString()); // The default operation is "index"
|
||||||
|
ctx.put(ContextFields.INDEX, getResult.getIndex());
|
||||||
|
ctx.put(ContextFields.TYPE, getResult.getType());
|
||||||
|
ctx.put(ContextFields.ID, getResult.getId());
|
||||||
|
ctx.put(ContextFields.VERSION, getResult.getVersion());
|
||||||
|
ctx.put(ContextFields.ROUTING, routing);
|
||||||
|
ctx.put(ContextFields.PARENT, parent);
|
||||||
|
ctx.put(ContextFields.SOURCE, sourceAsMap);
|
||||||
|
ctx.put(ContextFields.NOW, nowInMillis.getAsLong());
|
||||||
|
|
||||||
|
ctx = executeScript(request.script, ctx);
|
||||||
|
|
||||||
|
UpdateOpType operation = UpdateOpType.lenientFromString((String) ctx.get(ContextFields.OP), logger, request.script.getIdOrCode());
|
||||||
|
|
||||||
|
final Map<String, Object> updatedSourceAsMap = (Map<String, Object>) ctx.get(ContextFields.SOURCE);
|
||||||
|
|
||||||
|
switch (operation) {
|
||||||
|
case INDEX:
|
||||||
|
final IndexRequest indexRequest = Requests.indexRequest(request.index())
|
||||||
|
.type(request.type()).id(request.id()).routing(routing).parent(parent)
|
||||||
|
.source(updatedSourceAsMap, updateSourceContentType).version(updateVersion).versionType(request.versionType())
|
||||||
|
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
|
||||||
|
.setRefreshPolicy(request.getRefreshPolicy());
|
||||||
|
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
|
||||||
|
case DELETE:
|
||||||
|
DeleteRequest deleteRequest = Requests.deleteRequest(request.index())
|
||||||
|
.type(request.type()).id(request.id()).routing(routing).parent(parent)
|
||||||
|
.version(updateVersion).versionType(request.versionType()).waitForActiveShards(request.waitForActiveShards())
|
||||||
|
.timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy());
|
||||||
|
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
|
||||||
|
default:
|
||||||
|
// If it was neither an INDEX or DELETE operation, treat it as a noop
|
||||||
|
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
|
||||||
|
getResult.getVersion(), DocWriteResponse.Result.NOOP);
|
||||||
|
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap,
|
||||||
|
updateSourceContentType, getResult.internalSourceRef()));
|
||||||
|
return new Result(update, DocWriteResponse.Result.NOOP, updatedSourceAsMap, updateSourceContentType);
|
||||||
|
|
||||||
if (operation == null || "index".equals(operation)) {
|
|
||||||
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
|
|
||||||
.source(updatedSourceAsMap, updateSourceContentType)
|
|
||||||
.version(updateVersion).versionType(request.versionType())
|
|
||||||
.waitForActiveShards(request.waitForActiveShards())
|
|
||||||
.timeout(request.timeout())
|
|
||||||
.setRefreshPolicy(request.getRefreshPolicy());
|
|
||||||
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
|
|
||||||
} else if ("delete".equals(operation)) {
|
|
||||||
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
|
|
||||||
.version(updateVersion).versionType(request.versionType())
|
|
||||||
.waitForActiveShards(request.waitForActiveShards())
|
|
||||||
.timeout(request.timeout())
|
|
||||||
.setRefreshPolicy(request.getRefreshPolicy());
|
|
||||||
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
|
|
||||||
} else if ("none".equals(operation)) {
|
|
||||||
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), DocWriteResponse.Result.NOOP);
|
|
||||||
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
|
|
||||||
return new Result(update, DocWriteResponse.Result.NOOP, updatedSourceAsMap, updateSourceContentType);
|
|
||||||
} else {
|
|
||||||
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getIdOrCode());
|
|
||||||
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), DocWriteResponse.Result.NOOP);
|
|
||||||
return new Result(update, DocWriteResponse.Result.NOOP, updatedSourceAsMap, updateSourceContentType);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,7 +303,7 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
if (scriptService != null) {
|
if (scriptService != null) {
|
||||||
CompiledScript compiledScript = scriptService.compile(script, ScriptContext.Standard.UPDATE);
|
CompiledScript compiledScript = scriptService.compile(script, ScriptContext.Standard.UPDATE);
|
||||||
ExecutableScript executableScript = scriptService.executable(compiledScript, script.getParams());
|
ExecutableScript executableScript = scriptService.executable(compiledScript, script.getParams());
|
||||||
executableScript.setNextVar("ctx", ctx);
|
executableScript.setNextVar(ContextFields.CTX, ctx);
|
||||||
executableScript.run();
|
executableScript.run();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -229,7 +316,8 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
* Applies {@link UpdateRequest#fetchSource()} to the _source of the updated document to be returned in a update response.
|
* Applies {@link UpdateRequest#fetchSource()} to the _source of the updated document to be returned in a update response.
|
||||||
* For BWC this function also extracts the {@link UpdateRequest#fields()} from the updated document to be returned in a update response
|
* For BWC this function also extracts the {@link UpdateRequest#fields()} from the updated document to be returned in a update response
|
||||||
*/
|
*/
|
||||||
public GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version, final Map<String, Object> source, XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) {
|
public GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version, final Map<String, Object> source,
|
||||||
|
XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) {
|
||||||
if ((request.fields() == null || request.fields().length == 0) &&
|
if ((request.fields() == null || request.fields().length == 0) &&
|
||||||
(request.fetchSource() == null || request.fetchSource().fetchSource() == false)) {
|
(request.fetchSource() == null || request.fetchSource().fetchSource() == false)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -278,7 +366,8 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)
|
// TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)
|
||||||
return new GetResult(concreteIndex, request.type(), request.id(), version, true, sourceRequested ? sourceFilteredAsBytes : null, fields);
|
return new GetResult(concreteIndex, request.type(), request.id(), version, true,
|
||||||
|
sourceRequested ? sourceFilteredAsBytes : null, fields);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Result {
|
public static class Result {
|
||||||
|
@ -288,7 +377,8 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
private final Map<String, Object> updatedSourceAsMap;
|
private final Map<String, Object> updatedSourceAsMap;
|
||||||
private final XContentType updateSourceContentType;
|
private final XContentType updateSourceContentType;
|
||||||
|
|
||||||
public Result(Streamable action, DocWriteResponse.Result result, Map<String, Object> updatedSourceAsMap, XContentType updateSourceContentType) {
|
public Result(Streamable action, DocWriteResponse.Result result, Map<String, Object> updatedSourceAsMap,
|
||||||
|
XContentType updateSourceContentType) {
|
||||||
this.action = action;
|
this.action = action;
|
||||||
this.result = result;
|
this.result = result;
|
||||||
this.updatedSourceAsMap = updatedSourceAsMap;
|
this.updatedSourceAsMap = updatedSourceAsMap;
|
||||||
|
@ -313,4 +403,58 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After executing the script, this is the type of operation that will be used for subsequent actions. This corresponds to the "ctx.op"
|
||||||
|
* variable inside of scripts.
|
||||||
|
*/
|
||||||
|
enum UpdateOpType {
|
||||||
|
CREATE("create"),
|
||||||
|
INDEX("index"),
|
||||||
|
DELETE("delete"),
|
||||||
|
NONE("none");
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
UpdateOpType(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static UpdateOpType lenientFromString(String operation, Logger logger, String scriptId) {
|
||||||
|
switch (operation) {
|
||||||
|
case "create":
|
||||||
|
return UpdateOpType.CREATE;
|
||||||
|
case "index":
|
||||||
|
return UpdateOpType.INDEX;
|
||||||
|
case "delete":
|
||||||
|
return UpdateOpType.DELETE;
|
||||||
|
case "none":
|
||||||
|
return UpdateOpType.NONE;
|
||||||
|
default:
|
||||||
|
// TODO: can we remove this leniency yet??
|
||||||
|
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", operation, scriptId);
|
||||||
|
return UpdateOpType.NONE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Field names used to populate the script context
|
||||||
|
*/
|
||||||
|
public static class ContextFields {
|
||||||
|
public static final String CTX = "ctx";
|
||||||
|
public static final String OP = "op";
|
||||||
|
public static final String SOURCE = "_source";
|
||||||
|
public static final String NOW = "_now";
|
||||||
|
public static final String INDEX = "_index";
|
||||||
|
public static final String TYPE = "_type";
|
||||||
|
public static final String ID = "_id";
|
||||||
|
public static final String VERSION = "_version";
|
||||||
|
public static final String ROUTING = "_routing";
|
||||||
|
public static final String PARENT = "_parent";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.elasticsearch.action.update;
|
package org.elasticsearch.action.update;
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
@ -34,6 +36,8 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.index.VersionType;
|
||||||
|
import org.elasticsearch.index.get.GetField;
|
||||||
import org.elasticsearch.index.get.GetResult;
|
import org.elasticsearch.index.get.GetResult;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.script.MockScriptEngine;
|
import org.elasticsearch.script.MockScriptEngine;
|
||||||
|
@ -50,6 +54,7 @@ import org.junit.Before;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -92,6 +97,16 @@ public class UpdateRequestTests extends ESTestCase {
|
||||||
source.put("update_timestamp", ctx.get("_now"));
|
source.put("update_timestamp", ctx.get("_now"));
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
scripts.put(
|
||||||
|
"ctx._source.body = \"foo\"",
|
||||||
|
vars -> {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Map<String, Object> source = (Map<String, Object>) ctx.get("_source");
|
||||||
|
source.put("body", "foo");
|
||||||
|
return null;
|
||||||
|
});
|
||||||
scripts.put(
|
scripts.put(
|
||||||
"ctx._timestamp = ctx._now",
|
"ctx._timestamp = ctx._now",
|
||||||
vars -> {
|
vars -> {
|
||||||
|
@ -108,6 +123,22 @@ public class UpdateRequestTests extends ESTestCase {
|
||||||
ctx.put("op", "delete");
|
ctx.put("op", "delete");
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
scripts.put(
|
||||||
|
"ctx.op = bad",
|
||||||
|
vars -> {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
|
||||||
|
ctx.put("op", "bad");
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
scripts.put(
|
||||||
|
"ctx.op = none",
|
||||||
|
vars -> {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
final Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
|
||||||
|
ctx.put("op", "none");
|
||||||
|
return null;
|
||||||
|
});
|
||||||
scripts.put("return", vars -> null);
|
scripts.put("return", vars -> null);
|
||||||
final ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(emptyList());
|
final ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(emptyList());
|
||||||
final MockScriptEngine engine = new MockScriptEngine("mock", scripts);
|
final MockScriptEngine engine = new MockScriptEngine("mock", scripts);
|
||||||
|
@ -502,4 +533,131 @@ public class UpdateRequestTests extends ESTestCase {
|
||||||
updateRequest.upsert(new IndexRequest("index", "type", "1").version(1L));
|
updateRequest.upsert(new IndexRequest("index", "type", "1").version(1L));
|
||||||
assertThat(updateRequest.validate().validationErrors(), contains("can't provide version in upsert request"));
|
assertThat(updateRequest.validate().validationErrors(), contains("can't provide version in upsert request"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testParentAndRoutingExtraction() throws Exception {
|
||||||
|
GetResult getResult = new GetResult("test", "type", "1", 0, false, null, null);
|
||||||
|
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
|
||||||
|
|
||||||
|
// There is no routing and parent because the document doesn't exist
|
||||||
|
assertNull(UpdateHelper.calculateRouting(getResult, null));
|
||||||
|
assertNull(UpdateHelper.calculateParent(getResult, null));
|
||||||
|
|
||||||
|
// There is no routing and parent the indexing request
|
||||||
|
assertNull(UpdateHelper.calculateRouting(getResult, indexRequest));
|
||||||
|
assertNull(UpdateHelper.calculateParent(getResult, indexRequest));
|
||||||
|
|
||||||
|
// Doc exists but has no source or fields
|
||||||
|
getResult = new GetResult("test", "type", "1", 0, true, null, null);
|
||||||
|
|
||||||
|
// There is no routing and parent on either request
|
||||||
|
assertNull(UpdateHelper.calculateRouting(getResult, indexRequest));
|
||||||
|
assertNull(UpdateHelper.calculateParent(getResult, indexRequest));
|
||||||
|
|
||||||
|
Map<String, GetField> fields = new HashMap<>();
|
||||||
|
fields.put("_parent", new GetField("_parent", Collections.singletonList("parent1")));
|
||||||
|
fields.put("_routing", new GetField("_routing", Collections.singletonList("routing1")));
|
||||||
|
|
||||||
|
// Doc exists and has the parent and routing fields
|
||||||
|
getResult = new GetResult("test", "type", "1", 0, true, null, fields);
|
||||||
|
|
||||||
|
// Use the get result parent and routing
|
||||||
|
assertThat(UpdateHelper.calculateRouting(getResult, indexRequest), equalTo("routing1"));
|
||||||
|
assertThat(UpdateHelper.calculateParent(getResult, indexRequest), equalTo("parent1"));
|
||||||
|
|
||||||
|
// Index request has overriding parent and routing values
|
||||||
|
indexRequest = new IndexRequest("test", "type", "1").parent("parent2").routing("routing2");
|
||||||
|
|
||||||
|
// Use the request's parent and routing
|
||||||
|
assertThat(UpdateHelper.calculateRouting(getResult, indexRequest), equalTo("routing2"));
|
||||||
|
assertThat(UpdateHelper.calculateParent(getResult, indexRequest), equalTo("parent2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecated") // VersionType.FORCE is deprecated
|
||||||
|
public void testCalculateUpdateVersion() throws Exception {
|
||||||
|
long randomVersion = randomIntBetween(0, 100);
|
||||||
|
GetResult getResult = new GetResult("test", "type", "1", randomVersion, true, new BytesArray("{}"), null);
|
||||||
|
|
||||||
|
UpdateRequest request = new UpdateRequest("test", "type1", "1");
|
||||||
|
long version = UpdateHelper.calculateUpdateVersion(request, getResult);
|
||||||
|
|
||||||
|
// Use the get result's version
|
||||||
|
assertThat(version, equalTo(randomVersion));
|
||||||
|
|
||||||
|
request = new UpdateRequest("test", "type1", "1").versionType(VersionType.FORCE).version(1337);
|
||||||
|
version = UpdateHelper.calculateUpdateVersion(request, getResult);
|
||||||
|
|
||||||
|
// Use the forced update request version
|
||||||
|
assertThat(version, equalTo(1337L));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNoopDetection() throws Exception {
|
||||||
|
ShardId shardId = new ShardId("test", "", 0);
|
||||||
|
GetResult getResult = new GetResult("test", "type", "1", 0, true,
|
||||||
|
new BytesArray("{\"body\": \"foo\"}"),
|
||||||
|
null);
|
||||||
|
|
||||||
|
UpdateRequest request = new UpdateRequest("test", "type1", "1").fromXContent(
|
||||||
|
createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}")));
|
||||||
|
|
||||||
|
UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
|
||||||
|
|
||||||
|
assertThat(result.action(), instanceOf(UpdateResponse.class));
|
||||||
|
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.NOOP));
|
||||||
|
|
||||||
|
// Try again, with detectNoop turned off
|
||||||
|
result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, false);
|
||||||
|
assertThat(result.action(), instanceOf(IndexRequest.class));
|
||||||
|
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED));
|
||||||
|
assertThat(result.updatedSourceAsMap().get("body").toString(), equalTo("foo"));
|
||||||
|
|
||||||
|
// Change the request to be a different doc
|
||||||
|
request = new UpdateRequest("test", "type1", "1").fromXContent(
|
||||||
|
createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"bar\"}}")));
|
||||||
|
result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
|
||||||
|
|
||||||
|
assertThat(result.action(), instanceOf(IndexRequest.class));
|
||||||
|
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED));
|
||||||
|
assertThat(result.updatedSourceAsMap().get("body").toString(), equalTo("bar"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testUpdateScript() throws Exception {
|
||||||
|
ShardId shardId = new ShardId("test", "", 0);
|
||||||
|
GetResult getResult = new GetResult("test", "type", "1", 0, true,
|
||||||
|
new BytesArray("{\"body\": \"bar\"}"),
|
||||||
|
null);
|
||||||
|
|
||||||
|
UpdateRequest request = new UpdateRequest("test", "type1", "1")
|
||||||
|
.script(mockInlineScript("ctx._source.body = \"foo\""));
|
||||||
|
|
||||||
|
UpdateHelper.Result result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult,
|
||||||
|
ESTestCase::randomNonNegativeLong);
|
||||||
|
|
||||||
|
assertThat(result.action(), instanceOf(IndexRequest.class));
|
||||||
|
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED));
|
||||||
|
assertThat(result.updatedSourceAsMap().get("body").toString(), equalTo("foo"));
|
||||||
|
|
||||||
|
// Now where the script changes the op to "delete"
|
||||||
|
request = new UpdateRequest("test", "type1", "1").script(mockInlineScript("ctx.op = delete"));
|
||||||
|
|
||||||
|
result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult,
|
||||||
|
ESTestCase::randomNonNegativeLong);
|
||||||
|
|
||||||
|
assertThat(result.action(), instanceOf(DeleteRequest.class));
|
||||||
|
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.DELETED));
|
||||||
|
|
||||||
|
// We treat everything else as a No-op
|
||||||
|
boolean goodNoop = randomBoolean();
|
||||||
|
if (goodNoop) {
|
||||||
|
request = new UpdateRequest("test", "type1", "1").script(mockInlineScript("ctx.op = none"));
|
||||||
|
} else {
|
||||||
|
request = new UpdateRequest("test", "type1", "1").script(mockInlineScript("ctx.op = bad"));
|
||||||
|
}
|
||||||
|
|
||||||
|
result = updateHelper.prepareUpdateScriptRequest(shardId, request, getResult,
|
||||||
|
ESTestCase::randomNonNegativeLong);
|
||||||
|
|
||||||
|
assertThat(result.action(), instanceOf(UpdateResponse.class));
|
||||||
|
assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.NOOP));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue