Update: Detect noop updates sent with doc_as_upsert

This should help prevent spurious updates that just cause extra writing
and cache invalidation for no real reason.

Close #6822
This commit is contained in:
Nik Everett 2014-07-14 12:43:29 -04:00 committed by Adrien Grand
parent ebcc1e0bf5
commit 79433d23e3
10 changed files with 355 additions and 14 deletions

View File

@ -109,6 +109,23 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
If both `doc` and `script` is specified, then `doc` is ignored. Best is
to put your field pairs of the partial document in the script itself.
By default if `doc` is specified then the document is always updated even
if the merging process doesn't cause any changes. Specifying `detect_noop`
as `true` will cause Elasticsearch to check if there are changes and, if
there aren't, turn the update request into a noop. For example:
[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc" : {
"name" : "new_name"
},
"detect_noop": true
}'
--------------------------------------------------
If `name` was `new_name` before the request was sent then the entire update
request is ignored.
There is also support for `upsert`. If the document does
not already exists, the content of the `upsert` element will be used to
index the fresh doc:

View File

@ -550,6 +550,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
case NONE:
UpdateResponse updateResponse = translate.action();
indexShard.indexingService().noopUpdate(updateRequest.type());
return new UpdateResult(translate, updateResponse);
default:
throw new ElasticsearchIllegalStateException("Illegal update operation " + translate.operation());

View File

@ -56,6 +56,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -70,16 +71,18 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
private final UpdateHelper updateHelper;
private final IndicesService indicesService;
@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction,
UpdateHelper updateHelper, ActionFilters actionFilters) {
UpdateHelper updateHelper, ActionFilters actionFilters, IndicesService indicesService) {
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.createIndexAction = createIndexAction;
this.updateHelper = updateHelper;
this.indicesService = indicesService;
this.autoCreateIndex = new AutoCreateIndex(settings);
}
@ -281,6 +284,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
case NONE:
UpdateResponse update = result.action();
listener.onResponse(update);
indicesService.indexService(request.index()).shard(request.shardId()).indexingService().noopUpdate(request.type());
break;
default:
throw new ElasticsearchIllegalStateException("Illegal operation " + result.operation());

View File

@ -140,7 +140,13 @@ public class UpdateHelper extends AbstractComponent {
if (indexRequest.parent() != null) {
parent = indexRequest.parent();
}
XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap());
boolean noop = !XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap(), request.detectNoop());
// noop could still be true even if detectNoop isn't because update detects empty maps as noops. BUT we can only
// actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle
// cases where users repopulating multi-fields or adding synonyms, etc.
if (request.detectNoop() && noop) {
operation = "none";
}
} else {
Map<String, Object> ctx = new HashMap<>(2);
ctx.put("_source", sourceAndContent.v2());
@ -196,7 +202,7 @@ public class UpdateHelper extends AbstractComponent {
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null));
update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);

View File

@ -77,6 +77,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private IndexRequest upsertRequest;
private boolean docAsUpsert = false;
private boolean detectNoop = false;
@Nullable
private IndexRequest doc;
@ -561,6 +562,19 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return source(new BytesArray(source, offset, length));
}
/**
* Should this update attempt to detect if it is a noop?
* @return this for chaining
*/
public UpdateRequest detectNoop(boolean detectNoop) {
this.detectNoop = detectNoop;
return this;
}
public boolean detectNoop() {
return detectNoop;
}
public UpdateRequest source(BytesReference source) throws Exception {
XContentType xContentType = XContentFactory.xContentType(source);
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(source)) {
@ -588,6 +602,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
safeDoc().source(docBuilder);
} else if ("doc_as_upsert".equals(currentFieldName)) {
docAsUpsert(parser.booleanValue());
} else if ("detect_noop".equals(currentFieldName)) {
detectNoop(parser.booleanValue());
}
}
}
@ -640,6 +656,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
docAsUpsert = in.readBoolean();
version = Versions.readVersion(in);
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
detectNoop = in.readBoolean();
}
}
@Override
@ -689,6 +708,9 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
out.writeBoolean(docAsUpsert);
Versions.writeVersion(version, out);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(detectNoop);
}
}
}

View File

@ -346,6 +346,14 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
return this;
}
/**
* Sets whether to perform extra effort to detect noop updates via docAsUpsert.
*/
public UpdateRequestBuilder setDetectNoop(boolean detectNoop) {
request.detectNoop(detectNoop);
return this;
}
@Override
protected void doExecute(ActionListener<UpdateResponse> listener) {
client.update(request, listener);

View File

@ -184,22 +184,40 @@ public class XContentHelper {
/**
* Updates the provided changes into the source. If the key exists in the changes, it overrides the one in source
* unless both are Maps, in which case it recuersively updated it.
* @param source the original map to be updated
* @param changes the changes to update into updated
* @param checkUpdatesAreUnequal should this method check if updates to the same key (that are not both maps) are
* unequal? This is just a .equals check on the objects, but that can take some time on long strings.
* @return true if the source map was modified
*/
public static void update(Map<String, Object> source, Map<String, Object> changes) {
public static boolean update(Map<String, Object> source, Map<String, Object> changes, boolean checkUpdatesAreUnequal) {
boolean modified = false;
for (Map.Entry<String, Object> changesEntry : changes.entrySet()) {
if (!source.containsKey(changesEntry.getKey())) {
// safe to copy, change does not exist in source
source.put(changesEntry.getKey(), changesEntry.getValue());
} else {
if (source.get(changesEntry.getKey()) instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
update((Map<String, Object>) source.get(changesEntry.getKey()), (Map<String, Object>) changesEntry.getValue());
} else {
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
}
modified = true;
continue;
}
Object old = source.get(changesEntry.getKey());
if (old instanceof Map && changesEntry.getValue() instanceof Map) {
// recursive merge maps
modified |= update((Map<String, Object>) source.get(changesEntry.getKey()),
(Map<String, Object>) changesEntry.getValue(), checkUpdatesAreUnequal && !modified);
continue;
}
// update the field
source.put(changesEntry.getKey(), changesEntry.getValue());
if (modified) {
continue;
}
if (!checkUpdatesAreUnequal || old == null) {
modified = true;
continue;
}
modified = !old.equals(changesEntry.getValue());
}
return modified;
}
/**

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.indexing;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -46,17 +47,20 @@ public class IndexingStats implements Streamable, ToXContent {
private long deleteTimeInMillis;
private long deleteCurrent;
private long noopUpdateCount;
Stats() {
}
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent) {
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
this.indexCurrent = indexCurrent;
this.deleteCount = deleteCount;
this.deleteTimeInMillis = deleteTimeInMillis;
this.deleteCurrent = deleteCurrent;
this.noopUpdateCount = noopUpdateCount;
}
public void add(Stats stats) {
@ -67,6 +71,8 @@ public class IndexingStats implements Streamable, ToXContent {
deleteCount += stats.deleteCount;
deleteTimeInMillis += stats.deleteTimeInMillis;
deleteCurrent += stats.deleteCurrent;
noopUpdateCount += stats.noopUpdateCount;
}
public long getIndexCount() {
@ -101,6 +107,10 @@ public class IndexingStats implements Streamable, ToXContent {
return deleteCurrent;
}
public long getNoopUpdateCount() {
return noopUpdateCount;
}
public static Stats readStats(StreamInput in) throws IOException {
Stats stats = new Stats();
stats.readFrom(in);
@ -116,6 +126,10 @@ public class IndexingStats implements Streamable, ToXContent {
deleteCount = in.readVLong();
deleteTimeInMillis = in.readVLong();
deleteCurrent = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
noopUpdateCount = in.readVLong();
}
}
@Override
@ -127,6 +141,10 @@ public class IndexingStats implements Streamable, ToXContent {
out.writeVLong(deleteCount);
out.writeVLong(deleteTimeInMillis);
out.writeVLong(deleteCurrent);
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeVLong(noopUpdateCount);
}
}
@Override
@ -139,6 +157,8 @@ public class IndexingStats implements Streamable, ToXContent {
builder.timeValueField(Fields.DELETE_TIME_IN_MILLIS, Fields.DELETE_TIME, deleteTimeInMillis);
builder.field(Fields.DELETE_CURRENT, deleteCurrent);
builder.field(Fields.NOOP_UPDATE_TOTAL, noopUpdateCount);
return builder;
}
}
@ -218,6 +238,7 @@ public class IndexingStats implements Streamable, ToXContent {
static final XContentBuilderString DELETE_TIME = new XContentBuilderString("delete_time");
static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis");
static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current");
static final XContentBuilderString NOOP_UPDATE_TOTAL = new XContentBuilderString("noop_update_total");
}
public static IndexingStats readIndexingStats(StreamInput in) throws IOException {

View File

@ -218,6 +218,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
}
}
public void noopUpdate(String type) {
totalStats.noopUpdates.inc();
typeStats(type).noopUpdates.inc();
}
public void clear() {
totalStats.clear();
synchronized (this) {
@ -253,11 +258,13 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public final MeanMetric deleteMetric = new MeanMetric();
public final CounterMetric indexCurrent = new CounterMetric();
public final CounterMetric deleteCurrent = new CounterMetric();
public final CounterMetric noopUpdates = new CounterMetric();
public IndexingStats.Stats stats() {
return new IndexingStats.Stats(
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(),
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count());
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
noopUpdates.count());
}
public long totalCurrent() {

View File

@ -0,0 +1,237 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.update;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.hamcrest.Matchers.notNullValue;
/**
* Tests for noop updates.
*/
public class UpdateNoopTests extends ElasticsearchIntegrationTest {
@Test
public void singleField() throws Exception {
updateAndCheckSource(1, fields("bar", "baz"));
updateAndCheckSource(1, fields("bar", "baz"));
updateAndCheckSource(2, fields("bar", "bir"));
updateAndCheckSource(2, fields("bar", "bir"));
updateAndCheckSource(3, fields("bar", "foo"));
assertEquals(2, totalNoopUpdates());
}
@Test
public void twoFields() throws Exception {
// Use random keys so we get random iteration order.
String key1 = 1 + randomAsciiOfLength(3);
String key2 = 2 + randomAsciiOfLength(3);
updateAndCheckSource(1, fields(key1, "foo", key2, "baz"));
updateAndCheckSource(1, fields(key1, "foo", key2, "baz"));
updateAndCheckSource(2, fields(key1, "foo", key2, "bir"));
updateAndCheckSource(2, fields(key1, "foo", key2, "bir"));
updateAndCheckSource(3, fields(key1, "foo", key2, "foo"));
assertEquals(2, totalNoopUpdates());
}
@Test
public void arrayField() throws Exception {
updateAndCheckSource(1, fields("bar", "baz"));
updateAndCheckSource(2, fields("bar", new String[] {"baz", "bort"}));
updateAndCheckSource(2, fields("bar", new String[] {"baz", "bort"}));
updateAndCheckSource(3, fields("bar", "bir"));
updateAndCheckSource(3, fields("bar", "bir"));
updateAndCheckSource(4, fields("bar", new String[] {"baz", "bort"}));
updateAndCheckSource(4, fields("bar", new String[] {"baz", "bort"}));
updateAndCheckSource(5, fields("bar", new String[] {"bir", "bort"}));
updateAndCheckSource(5, fields("bar", new String[] {"bir", "bort"}));
updateAndCheckSource(6, fields("bar", new String[] {"bir", "for"}));
updateAndCheckSource(6, fields("bar", new String[] {"bir", "for"}));
updateAndCheckSource(7, fields("bar", new String[] {"bir", "for", "far"}));
assertEquals(5, totalNoopUpdates());
}
@Test
public void map() throws Exception {
// Use random keys so we get variable iteration order.
String key1 = 1 + randomAsciiOfLength(3);
String key2 = 2 + randomAsciiOfLength(3);
updateAndCheckSource(1, XContentFactory.jsonBuilder().startObject()
.startObject("test")
.field(key1, "foo")
.field(key2, "baz")
.endObject().endObject());
updateAndCheckSource(1, XContentFactory.jsonBuilder().startObject()
.startObject("test")
.field(key1, "foo")
.field(key2, "baz")
.endObject().endObject());
updateAndCheckSource(2, XContentFactory.jsonBuilder().startObject()
.startObject("test")
.field(key1, "foo")
.field(key2, "bir")
.endObject().endObject());
updateAndCheckSource(2, XContentFactory.jsonBuilder().startObject()
.startObject("test")
.field(key1, "foo")
.field(key2, "bir")
.endObject().endObject());
updateAndCheckSource(3, XContentFactory.jsonBuilder().startObject()
.startObject("test")
.field(key1, "foo")
.field(key2, "foo")
.endObject().endObject());
assertEquals(2, totalNoopUpdates());
}
@Test
public void mapAndField() throws Exception {
updateAndCheckSource(1, XContentFactory.jsonBuilder().startObject()
.field("f", "foo")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "baz")
.endObject()
.endObject());
updateAndCheckSource(1, XContentFactory.jsonBuilder().startObject()
.field("f", "foo")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "baz")
.endObject()
.endObject());
updateAndCheckSource(2, XContentFactory.jsonBuilder().startObject()
.field("f", "foo")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "bir")
.endObject()
.endObject());
updateAndCheckSource(2, XContentFactory.jsonBuilder().startObject()
.field("f", "foo")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "bir")
.endObject()
.endObject());
updateAndCheckSource(3, XContentFactory.jsonBuilder().startObject()
.field("f", "foo")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "foo")
.endObject()
.endObject());
updateAndCheckSource(4, XContentFactory.jsonBuilder().startObject()
.field("f", "bar")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "foo")
.endObject()
.endObject());
updateAndCheckSource(4, XContentFactory.jsonBuilder().startObject()
.field("f", "bar")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "foo")
.endObject()
.endObject());
updateAndCheckSource(5, XContentFactory.jsonBuilder().startObject()
.field("f", "baz")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "foo")
.endObject()
.endObject());
updateAndCheckSource(6, XContentFactory.jsonBuilder().startObject()
.field("f", "bop")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "foo")
.endObject()
.endObject());
assertEquals(3, totalNoopUpdates());
}
/**
* Totally empty requests are noop if and only if detect noops is true.
*/
@Test
public void totallyEmpty() throws Exception {
updateAndCheckSource(1, XContentFactory.jsonBuilder().startObject()
.field("f", "foo")
.startObject("m")
.field("mf1", "foo")
.field("mf2", "baz")
.endObject()
.endObject());
update(true, 1, XContentFactory.jsonBuilder().startObject().endObject());
update(false, 2, XContentFactory.jsonBuilder().startObject().endObject());
}
private XContentBuilder fields(Object... fields) throws ElasticsearchException, IOException {
assertEquals("Fields must field1, value1, field2, value2, etc", 0, fields.length % 2);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
builder.field((String) fields[i], fields[i + 1]);
}
builder.endObject();
return builder;
}
private void updateAndCheckSource(long expectedVersion, XContentBuilder xContentBuilder) {
UpdateResponse updateResponse = update(true, expectedVersion, xContentBuilder);
assertEquals(updateResponse.getGetResult().sourceRef().toUtf8(), xContentBuilder.bytes().toUtf8());
}
private UpdateResponse update(boolean detectNoop, long expectedVersion, XContentBuilder xContentBuilder) {
UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1")
.setDoc(xContentBuilder.bytes().toUtf8())
.setDocAsUpsert(true)
.setDetectNoop(detectNoop)
.setFields("_source")
.execute().actionGet();
assertThat(updateResponse.getGetResult(), notNullValue());
assertEquals(expectedVersion, updateResponse.getVersion());
return updateResponse;
}
private long totalNoopUpdates() {
return client().admin().indices().prepareStats("test").setIndexing(true).get().getIndex("test").getTotal().getIndexing().getTotal()
.getNoopUpdateCount();
}
@Before
public void setup() {
createIndex("test");
ensureGreen();
}
}