diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 75b935ab089..0b9ba5a3f4e 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -14,6 +14,30 @@ $ curl -XPOST 'http://localhost:9200/twitter/_flush' -------------------------------------------------- [float] +[[flush-parameters]] +=== Request Parameters + +The flush API accepts the following request parameters: + +[horizontal] +`wait_if_ongoing`:: If set to `true` the flush operation will block until the +flush can be executed if another flush operation is already executing. +The default is `false` and will cause an exception to be thrown on +the shard level if another flush operation is already running. coming[1.4.0] + +`full`:: If set to `true` a new index writer is created and settings that have +been changed related to the index writer will be refreshed. Note: if a full flush +is required for a setting to take effect this will be part of the settings update +process and it not required to be executed by the user. +(This setting can be considered as internal) + +`force`:: Whether a flush should be forced even if it is not necessarily needed ie. +if no changes will be committed to the index. This is useful if transaction log IDs +should be incremented even if no uncommitted changes are present. +(This setting can be considered as internal) + +[float] +[[flush-multi-index]] === Multi Index The flush API can be applied to more than one index with a single call, diff --git a/rest-api-spec/api/indices.flush.json b/rest-api-spec/api/indices.flush.json index 4d1a115a1e5..87450915961 100644 --- a/rest-api-spec/api/indices.flush.json +++ b/rest-api-spec/api/indices.flush.json @@ -20,6 +20,10 @@ "type" : "boolean", "description" : "If set to true a new index writer is created and settings that have been changed related to the index writer will be refreshed. Note: if a full flush is required for a setting to take effect this will be part of the settings update process and it not required to be executed by the user. (This setting can be considered as internal)" }, + "wait_if_ongoing": { + "type" : "boolean", + "description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running." + }, "ignore_unavailable": { "type" : "boolean", "description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)" diff --git a/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java index 9deadc10759..25d9ae1c78a 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.Version; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -40,6 +41,7 @@ public class FlushRequest extends BroadcastOperationRequest { private boolean force = false; private boolean full = false; + private boolean waitIfOngoing = false; FlushRequest() { @@ -68,6 +70,23 @@ public class FlushRequest extends BroadcastOperationRequest { return this; } + /** + * Returns true iff a flush should block + * if a another flush operation is already running. Otherwise false + */ + public boolean waitIfOngoing() { + return this.waitIfOngoing; + } + + /** + * if set to true the flush will block + * if a another flush operation is already running until the flush can be performed. + */ + public FlushRequest waitIfOngoing(boolean waitIfOngoing) { + this.waitIfOngoing = waitIfOngoing; + return this; + } + /** * Force flushing, even if one is possibly not needed. */ @@ -88,6 +107,9 @@ public class FlushRequest extends BroadcastOperationRequest { super.writeTo(out); out.writeBoolean(full); out.writeBoolean(force); + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + out.writeBoolean(waitIfOngoing); + } } @Override @@ -95,5 +117,11 @@ public class FlushRequest extends BroadcastOperationRequest { super.readFrom(in); full = in.readBoolean(); force = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + waitIfOngoing = in.readBoolean(); + } else { + waitIfOngoing = false; + } } + } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequestBuilder.java b/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequestBuilder.java index 3937313917f..31d4d2918b3 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequestBuilder.java @@ -46,4 +46,9 @@ public class FlushRequestBuilder extends BroadcastOperationRequestBuilder listener) { client.flush(request, listener); } + + public FlushRequestBuilder setWaitIfOngoing(boolean waitIfOngoing) { + request.waitIfOngoing(waitIfOngoing); + return this; + } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index f98ea265991..0ffc7a0c218 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.Version; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -32,6 +33,7 @@ class ShardFlushRequest extends BroadcastShardOperationRequest { private boolean full; private boolean force; + private boolean waitIfOngoing; ShardFlushRequest() { } @@ -40,6 +42,7 @@ class ShardFlushRequest extends BroadcastShardOperationRequest { super(index, shardId, request); this.full = request.full(); this.force = request.force(); + this.waitIfOngoing = request.waitIfOngoing(); } public boolean full() { @@ -50,11 +53,20 @@ class ShardFlushRequest extends BroadcastShardOperationRequest { return this.force; } + public boolean waitIfOngoing() { + return waitIfOngoing; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); full = in.readBoolean(); force = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_1_4_0)) { + waitIfOngoing = in.readBoolean(); + } else { + waitIfOngoing = false; + } } @Override @@ -62,5 +74,8 @@ class ShardFlushRequest extends BroadcastShardOperationRequest { super.writeTo(out); out.writeBoolean(full); out.writeBoolean(force); + if (out.getVersion().onOrAfter(Version.V_1_4_0)) { + out.writeBoolean(waitIfOngoing); + } } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 987faa00f4f..de95576b7b2 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -107,7 +107,8 @@ public class TransportFlushAction extends TransportBroadcastOperationAction(channel) { @Override public RestResponse buildResponse(FlushResponse response, XContentBuilder builder) throws Exception { diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index e65b5653608..8c89d532d40 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -131,7 +131,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { } indexRandom(true, builders); ensureGreen(); - assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); + assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet()); // we have to flush at least once here since we don't corrupt the translog CountResponse countResponse = client().prepareCount().get(); assertHitCount(countResponse, numDocs); @@ -234,7 +234,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { } indexRandom(true, builders); ensureGreen(); - assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); + assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet()); // we have to flush at least once here since we don't corrupt the translog CountResponse countResponse = client().prepareCount().get(); assertHitCount(countResponse, numDocs); @@ -323,7 +323,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { } indexRandom(true, builders); ensureGreen(); - assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); + assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet()); // we have to flush at least once here since we don't corrupt the translog CountResponse countResponse = client().prepareCount().get(); assertHitCount(countResponse, numDocs); @@ -405,7 +405,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { } indexRandom(true, builders); ensureGreen(); - assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); + assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet()); // we have to flush at least once here since we don't corrupt the translog CountResponse countResponse = client().prepareCount().get(); assertHitCount(countResponse, numDocs); diff --git a/src/test/java/org/elasticsearch/indices/FlushTest.java b/src/test/java/org/elasticsearch/indices/FlushTest.java new file mode 100644 index 00000000000..6d064922c16 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/FlushTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.Arrays; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; + +public class FlushTest extends ElasticsearchIntegrationTest { + + @Test + public void testWaitIfOngoing() throws InterruptedException { + createIndex("test"); + ensureGreen("test"); + final int numIters = scaledRandomIntBetween(10, 30); + for (int i = 0; i < numIters; i++) { + for (int j = 0; j < 10; j++) { + client().prepareIndex("test", "test").setSource("{}").get(); + } + final CountDownLatch latch = new CountDownLatch(10); + final CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + for (int j = 0; j < 10; j++) { + client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute(new ActionListener() { + @Override + public void onResponse(FlushResponse flushResponse) { + try { + // dont' use assertAllSuccesssful it uses a randomized context that belongs to a different thread + assertThat("Unexpected ShardFailures: " + Arrays.toString(flushResponse.getShardFailures()), flushResponse.getFailedShards(), equalTo(0)); + latch.countDown(); + } catch (Throwable ex) { + onFailure(ex); + } + + } + + @Override + public void onFailure(Throwable e) { + errors.add(e); + latch.countDown(); + } + }); + } + latch.await(); + assertThat(errors, emptyIterable()); + } + } +}