Add `wait_if_ongoing` option to _flush requests

This commit adds the ability to force blocking on the flush operaition
to make sure all files have been written and synced to disk. Without
this option a flush might be executing at the same time causing the
current flush to fail and return before all files being synced.

Closes #6996
This commit is contained in:
Simon Willnauer 2014-07-23 21:59:26 +02:00
parent 127649d174
commit bd51d7a07f
9 changed files with 156 additions and 5 deletions

View File

@ -14,6 +14,30 @@ $ curl -XPOST 'http://localhost:9200/twitter/_flush'
--------------------------------------------------
[float]
[[flush-parameters]]
=== Request Parameters
The flush API accepts the following request parameters:
[horizontal]
`wait_if_ongoing`:: If set to `true` the flush operation will block until the
flush can be executed if another flush operation is already executing.
The default is `false` and will cause an exception to be thrown on
the shard level if another flush operation is already running. coming[1.4.0]
`full`:: If set to `true` a new index writer is created and settings that have
been changed related to the index writer will be refreshed. Note: if a full flush
is required for a setting to take effect this will be part of the settings update
process and it not required to be executed by the user.
(This setting can be considered as internal)
`force`:: Whether a flush should be forced even if it is not necessarily needed ie.
if no changes will be committed to the index. This is useful if transaction log IDs
should be incremented even if no uncommitted changes are present.
(This setting can be considered as internal)
[float]
[[flush-multi-index]]
=== Multi Index
The flush API can be applied to more than one index with a single call,

View File

@ -20,6 +20,10 @@
"type" : "boolean",
"description" : "If set to true a new index writer is created and settings that have been changed related to the index writer will be refreshed. Note: if a full flush is required for a setting to take effect this will be part of the settings update process and it not required to be executed by the user. (This setting can be considered as internal)"
},
"wait_if_ongoing": {
"type" : "boolean",
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running."
},
"ignore_unavailable": {
"type" : "boolean",
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -40,6 +41,7 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
private boolean force = false;
private boolean full = false;
private boolean waitIfOngoing = false;
FlushRequest() {
@ -68,6 +70,23 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
return this;
}
/**
* Returns <tt>true</tt> iff a flush should block
* if a another flush operation is already running. Otherwise <tt>false</tt>
*/
public boolean waitIfOngoing() {
return this.waitIfOngoing;
}
/**
* if set to <tt>true</tt> the flush will block
* if a another flush operation is already running until the flush can be performed.
*/
public FlushRequest waitIfOngoing(boolean waitIfOngoing) {
this.waitIfOngoing = waitIfOngoing;
return this;
}
/**
* Force flushing, even if one is possibly not needed.
*/
@ -88,6 +107,9 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
super.writeTo(out);
out.writeBoolean(full);
out.writeBoolean(force);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(waitIfOngoing);
}
}
@Override
@ -95,5 +117,11 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
super.readFrom(in);
full = in.readBoolean();
force = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
waitIfOngoing = in.readBoolean();
} else {
waitIfOngoing = false;
}
}
}

View File

@ -46,4 +46,9 @@ public class FlushRequestBuilder extends BroadcastOperationRequestBuilder<FlushR
protected void doExecute(ActionListener<FlushResponse> listener) {
client.flush(request, listener);
}
public FlushRequestBuilder setWaitIfOngoing(boolean waitIfOngoing) {
request.waitIfOngoing(waitIfOngoing);
return this;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -32,6 +33,7 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
private boolean full;
private boolean force;
private boolean waitIfOngoing;
ShardFlushRequest() {
}
@ -40,6 +42,7 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
super(index, shardId, request);
this.full = request.full();
this.force = request.force();
this.waitIfOngoing = request.waitIfOngoing();
}
public boolean full() {
@ -50,11 +53,20 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
return this.force;
}
public boolean waitIfOngoing() {
return waitIfOngoing;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
full = in.readBoolean();
force = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
waitIfOngoing = in.readBoolean();
} else {
waitIfOngoing = false;
}
}
@Override
@ -62,5 +74,8 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
super.writeTo(out);
out.writeBoolean(full);
out.writeBoolean(force);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(waitIfOngoing);
}
}
}

View File

@ -107,7 +107,8 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
@Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticsearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.flush(new Engine.Flush().type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
indexShard.flush(new Engine.Flush().waitIfOngoing(request.waitIfOngoing()).
type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
return new ShardFlushResponse(request.index(), request.shardId());
}

View File

@ -57,6 +57,7 @@ public class RestFlushAction extends BaseRestHandler {
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
flushRequest.full(request.paramAsBoolean("full", flushRequest.full()));
flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));
client.admin().indices().flush(flushRequest, new RestBuilderListener<FlushResponse>(channel) {
@Override
public RestResponse buildResponse(FlushResponse response, XContentBuilder builder) throws Exception {

View File

@ -131,7 +131,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
@ -234,7 +234,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
@ -323,7 +323,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
@ -405,7 +405,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
public class FlushTest extends ElasticsearchIntegrationTest {
@Test
public void testWaitIfOngoing() throws InterruptedException {
createIndex("test");
ensureGreen("test");
final int numIters = scaledRandomIntBetween(10, 30);
for (int i = 0; i < numIters; i++) {
for (int j = 0; j < 10; j++) {
client().prepareIndex("test", "test").setSource("{}").get();
}
final CountDownLatch latch = new CountDownLatch(10);
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<>();
for (int j = 0; j < 10; j++) {
client().admin().indices().prepareFlush("test").setWaitIfOngoing(true).execute(new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
try {
// dont' use assertAllSuccesssful it uses a randomized context that belongs to a different thread
assertThat("Unexpected ShardFailures: " + Arrays.toString(flushResponse.getShardFailures()), flushResponse.getFailedShards(), equalTo(0));
latch.countDown();
} catch (Throwable ex) {
onFailure(ex);
}
}
@Override
public void onFailure(Throwable e) {
errors.add(e);
latch.countDown();
}
});
}
latch.await();
assertThat(errors, emptyIterable());
}
}
}